Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,43 @@
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.function.SupplierWithException;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import lombok.Getter;

import javax.annotation.Nullable;

/** An implementation of JobAutoscalerContext for Kubernetes. */
public class KubernetesJobAutoScalerContext extends JobAutoScalerContext<ResourceID> {

private final AbstractFlinkResource<?, ?> resource;

private final KubernetesClient kubernetesClient;
@Getter private final FlinkResourceContext<?> resourceContext;

public KubernetesJobAutoScalerContext(
@Nullable JobID jobID,
@Nullable JobStatus jobStatus,
Configuration configuration,
MetricGroup metricGroup,
SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier,
AbstractFlinkResource<?, ?> resource,
KubernetesClient kubernetesClient) {
FlinkResourceContext<?> resourceContext) {
super(
ResourceID.fromResource(resource),
ResourceID.fromResource(resourceContext.getResource()),
jobID,
jobStatus,
configuration,
metricGroup,
restClientSupplier);
this.resource = resource;
this.kubernetesClient = kubernetesClient;
this.resourceContext = resourceContext;
}

public AbstractFlinkResource<?, ?> getResource() {
return resource;
return resourceContext.getResource();
}

public KubernetesClient getKubernetesClient() {
return kubernetesClient;
return resourceContext.getKubernetesClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@

import io.javaoperatorsdk.operator.processing.event.ResourceID;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.TreeMap;

/** The Kubernetes implementation for applying parallelism overrides. */
public class KubernetesScalingRealizer
Expand All @@ -33,14 +34,34 @@ public class KubernetesScalingRealizer
@Override
public void realize(
KubernetesJobAutoScalerContext context, Map<String, String> parallelismOverrides) {
// Make sure the keys are sorted via TreeMap to prevent changing the spec when none of the
// entries changed but the key order is different!
parallelismOverrides = new TreeMap<>(parallelismOverrides);

context.getResource()
.getSpec()
.getFlinkConfiguration()
.put(
PipelineOptions.PARALLELISM_OVERRIDES.key(),
ConfigurationUtils.convertValue(parallelismOverrides, String.class));
getOverrideString(context, parallelismOverrides));
}

@Nullable
private static String getOverrideString(
KubernetesJobAutoScalerContext context, Map<String, String> newOverrides) {
if (context.getResource().getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
return ConfigurationUtils.convertValue(newOverrides, String.class);
}

var conf = context.getResourceContext().getObserveConfig();
var currentOverrides =
conf.getOptional(PipelineOptions.PARALLELISM_OVERRIDES).orElse(Map.of());

// Check that the overrides actually changed and not just the String representation.
// This way we prevent reconciling a NOOP config change which would unnecessarily redeploy
// the pipeline.
if (currentOverrides.equals(newOverrides)) {
// If overrides are identical, use the previous string as-is.
return conf.getValue(PipelineOptions.PARALLELISM_OVERRIDES);
} else {
return ConfigurationUtils.convertValue(newOverrides, String.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
conf,
getResourceMetricGroup(),
() -> getFlinkService().getClusterClient(conf),
resource,
getKubernetesClient());
this);
}

@Nullable
Expand All @@ -104,6 +103,7 @@ private JobStatus generateJobStatusEnum(CommonStatus<?> status) {
*
* @return Config currently deployed.
*/
@Nullable
public Configuration getObserveConfig() {
if (observeConfig != null) {
return observeConfig;
Expand All @@ -118,6 +118,7 @@ public Configuration getObserveConfig() {
* @param spec Spec for which the config should be created.
* @return Deployment configuration.
*/
@Nullable
public abstract Configuration getDeployConfig(AbstractFlinkSpec spec);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
ctx.getResource().getSpec().getJob() != null
&& ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled);

autoscaler.scale(autoScalerCtx);
Copy link
Contributor

@afedulov afedulov Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to the section below explaining why this call to scale does not lead to the unnecessary update if we only patch the new config after it. I personally do not directly understand why this is the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call to scale can still lead to a spec change, even with the code below. It just prevents spec changes related to a non-deterministic ordering of the parallelism overrides, e.g. a:1,b2 and b:2,a:1. I think it is a good idea to add documentation on how the reconciliation loop works, but it doesn't feel directly related to the changes here. There is a comment in line 211 which states that we avoid changing the spec. I can try to expand a little more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correct. The root cause is the logic related to boolean specChanged in AbstractFlinkResourceReconciler#reconcile. The specChanged will be true, when the ordering of the parallelism overrides is changed. And operator will scale job.

Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks.

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.autoscaler;

import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;

import org.junit.jupiter.api.Test;

Expand All @@ -30,23 +31,59 @@
public class KubernetesScalingRealizerTest {

@Test
public void testAutoscalerOverridesVertexIdsAreSorted() {
public void testApplyOverrides() {
KubernetesJobAutoScalerContext ctx =
TestingKubernetesAutoscalerUtils.createContext("test", null);

new KubernetesScalingRealizer().realize(ctx, Map.of("a", "1", "b", "2"));

assertThat(
ctx.getResource()
.getSpec()
.getFlinkConfiguration()
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()))
.satisfiesAnyOf(
// Currently no enforced order inside the overrides string
overrides -> assertThat(overrides).isEqualTo("a:1,b:2"),
overrides -> assertThat(overrides).isEqualTo("b:2,a:1"));
}

@Test
public void testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() {
// Create an overrides map which returns the keys in a deterministic order
LinkedHashMap<String, String> newOverrides = new LinkedHashMap<>();
newOverrides.put("b", "2");
newOverrides.put("a", "1");

assertOverridesDoNotChange("a:1,b:2", newOverrides);
assertOverridesDoNotChange("b:2,a:1", newOverrides);
}

private void assertOverridesDoNotChange(
String currentOverrides, LinkedHashMap<String, String> newOverrides) {

KubernetesJobAutoScalerContext ctx =
TestingKubernetesAutoscalerUtils.createContext("test", null);
FlinkDeployment resource = (FlinkDeployment) ctx.getResource();

// Create map which returns keys unsorted
Map<String, String> overrides = new LinkedHashMap<>();
overrides.put("b", "2");
overrides.put("a", "1");
// Create resource with existing parallelism overrides
resource.getSpec()
.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), currentOverrides);
resource.getStatus()
.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(resource.getSpec(), resource);
resource.getSpec()
.getFlinkConfiguration()
.remove(PipelineOptions.PARALLELISM_OVERRIDES.key());

new KubernetesScalingRealizer().realize(ctx, overrides);
new KubernetesScalingRealizer().realize(ctx, newOverrides);

assertThat(
ctx.getResource()
.getSpec()
.getFlinkConfiguration()
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()))
.isEqualTo("a:1,b:2");
.isEqualTo(currentOverrides);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;

import io.fabric8.kubernetes.client.KubernetesClient;
Expand All @@ -42,7 +44,16 @@ public static KubernetesJobAutoScalerContext createContext(
new Configuration(),
new UnregisteredMetricsGroup(),
() -> new RestClusterClient<>(new Configuration(), "test-cluster"),
cr,
kubernetesClient);
new FlinkDeploymentContext(
cr,
new TestUtils.TestingContext<>() {
@Override
public KubernetesClient getClient() {
return kubernetesClient;
}
},
null,
new FlinkConfigManager(new Configuration()),
null));
}
}