diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java index b50f8a7afd..187a7118d5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java @@ -23,20 +23,20 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.function.SupplierWithException; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import lombok.Getter; import javax.annotation.Nullable; /** An implementation of JobAutoscalerContext for Kubernetes. */ public class KubernetesJobAutoScalerContext extends JobAutoScalerContext { - private final AbstractFlinkResource resource; - - private final KubernetesClient kubernetesClient; + @Getter private final FlinkResourceContext resourceContext; public KubernetesJobAutoScalerContext( @Nullable JobID jobID, @@ -44,24 +44,22 @@ public KubernetesJobAutoScalerContext( Configuration configuration, MetricGroup metricGroup, SupplierWithException, Exception> restClientSupplier, - AbstractFlinkResource resource, - KubernetesClient kubernetesClient) { + FlinkResourceContext resourceContext) { super( - ResourceID.fromResource(resource), + ResourceID.fromResource(resourceContext.getResource()), jobID, jobStatus, configuration, metricGroup, restClientSupplier); - this.resource = resource; - this.kubernetesClient = kubernetesClient; + this.resourceContext = resourceContext; } public AbstractFlinkResource getResource() { - return resource; + return resourceContext.getResource(); } public KubernetesClient getKubernetesClient() { - return kubernetesClient; + return resourceContext.getKubernetesClient(); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java index 6bb7a9494f..d20f783d2e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java @@ -23,8 +23,9 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; +import javax.annotation.Nullable; + import java.util.Map; -import java.util.TreeMap; /** The Kubernetes implementation for applying parallelism overrides. */ public class KubernetesScalingRealizer @@ -33,14 +34,34 @@ public class KubernetesScalingRealizer @Override public void realize( KubernetesJobAutoScalerContext context, Map parallelismOverrides) { - // Make sure the keys are sorted via TreeMap to prevent changing the spec when none of the - // entries changed but the key order is different! - parallelismOverrides = new TreeMap<>(parallelismOverrides); + context.getResource() .getSpec() .getFlinkConfiguration() .put( PipelineOptions.PARALLELISM_OVERRIDES.key(), - ConfigurationUtils.convertValue(parallelismOverrides, String.class)); + getOverrideString(context, parallelismOverrides)); + } + + @Nullable + private static String getOverrideString( + KubernetesJobAutoScalerContext context, Map newOverrides) { + if (context.getResource().getStatus().getReconciliationStatus().isBeforeFirstDeployment()) { + return ConfigurationUtils.convertValue(newOverrides, String.class); + } + + var conf = context.getResourceContext().getObserveConfig(); + var currentOverrides = + conf.getOptional(PipelineOptions.PARALLELISM_OVERRIDES).orElse(Map.of()); + + // Check that the overrides actually changed and not just the String representation. + // This way we prevent reconciling a NOOP config change which would unnecessarily redeploy + // the pipeline. + if (currentOverrides.equals(newOverrides)) { + // If overrides are identical, use the previous string as-is. + return conf.getValue(PipelineOptions.PARALLELISM_OVERRIDES); + } else { + return ConfigurationUtils.convertValue(newOverrides, String.class); + } } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java index e2b70f21e7..41b029ea4d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java @@ -81,8 +81,7 @@ private KubernetesJobAutoScalerContext createJobAutoScalerContext() { conf, getResourceMetricGroup(), () -> getFlinkService().getClusterClient(conf), - resource, - getKubernetesClient()); + this); } @Nullable @@ -104,6 +103,7 @@ private JobStatus generateJobStatusEnum(CommonStatus status) { * * @return Config currently deployed. */ + @Nullable public Configuration getObserveConfig() { if (observeConfig != null) { return observeConfig; @@ -118,6 +118,7 @@ public Configuration getObserveConfig() { * @param spec Spec for which the config should be created. * @return Deployment configuration. */ + @Nullable public abstract Configuration getDeployConfig(AbstractFlinkSpec spec); /** diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 5bef0ba7df..1113739c72 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -186,6 +186,7 @@ private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java index dda7ba0a43..be6f38c333 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.junit.jupiter.api.Test; @@ -30,23 +31,59 @@ public class KubernetesScalingRealizerTest { @Test - public void testAutoscalerOverridesVertexIdsAreSorted() { + public void testApplyOverrides() { + KubernetesJobAutoScalerContext ctx = + TestingKubernetesAutoscalerUtils.createContext("test", null); + + new KubernetesScalingRealizer().realize(ctx, Map.of("a", "1", "b", "2")); + + assertThat( + ctx.getResource() + .getSpec() + .getFlinkConfiguration() + .get(PipelineOptions.PARALLELISM_OVERRIDES.key())) + .satisfiesAnyOf( + // Currently no enforced order inside the overrides string + overrides -> assertThat(overrides).isEqualTo("a:1,b:2"), + overrides -> assertThat(overrides).isEqualTo("b:2,a:1")); + } + + @Test + public void testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() { + // Create an overrides map which returns the keys in a deterministic order + LinkedHashMap newOverrides = new LinkedHashMap<>(); + newOverrides.put("b", "2"); + newOverrides.put("a", "1"); + + assertOverridesDoNotChange("a:1,b:2", newOverrides); + assertOverridesDoNotChange("b:2,a:1", newOverrides); + } + + private void assertOverridesDoNotChange( + String currentOverrides, LinkedHashMap newOverrides) { KubernetesJobAutoScalerContext ctx = TestingKubernetesAutoscalerUtils.createContext("test", null); + FlinkDeployment resource = (FlinkDeployment) ctx.getResource(); - // Create map which returns keys unsorted - Map overrides = new LinkedHashMap<>(); - overrides.put("b", "2"); - overrides.put("a", "1"); + // Create resource with existing parallelism overrides + resource.getSpec() + .getFlinkConfiguration() + .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), currentOverrides); + resource.getStatus() + .getReconciliationStatus() + .serializeAndSetLastReconciledSpec(resource.getSpec(), resource); + resource.getSpec() + .getFlinkConfiguration() + .remove(PipelineOptions.PARALLELISM_OVERRIDES.key()); - new KubernetesScalingRealizer().realize(ctx, overrides); + new KubernetesScalingRealizer().realize(ctx, newOverrides); assertThat( ctx.getResource() .getSpec() .getFlinkConfiguration() .get(PipelineOptions.PARALLELISM_OVERRIDES.key())) - .isEqualTo("a:1,b:2"); + .isEqualTo(currentOverrides); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java index 9cb92ada18..dbf0c3bcbf 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java @@ -22,6 +22,8 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import io.fabric8.kubernetes.client.KubernetesClient; @@ -42,7 +44,16 @@ public static KubernetesJobAutoScalerContext createContext( new Configuration(), new UnregisteredMetricsGroup(), () -> new RestClusterClient<>(new Configuration(), "test-cluster"), - cr, - kubernetesClient); + new FlinkDeploymentContext( + cr, + new TestUtils.TestingContext<>() { + @Override + public KubernetesClient getClient() { + return kubernetesClient; + } + }, + null, + new FlinkConfigManager(new Configuration()), + null)); } }