From 7fcb8ef8304c128e5ce72f4851518a86d4b11636 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Sun, 23 Apr 2023 17:40:22 +0200 Subject: [PATCH] [FLINK-31885] Trigger event on autoscaler error --- .../autoscaler/JobAutoScalerImpl.java | 34 ++++++++++++------- .../autoscaler/JobAutoscalerFactoryImpl.java | 3 +- .../autoscaler/metrics/ScalingMetrics.java | 2 ++ .../autoscaler/BacklogBasedScalingTest.java | 23 ++++++++++++- .../operator/utils/EventRecorder.java | 3 +- .../operator/utils/EventUtilsTest.java | 10 ++++++ 6 files changed, 59 insertions(+), 16 deletions(-) diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java index 0ed547c134..8023d7c29a 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import io.fabric8.kubernetes.client.KubernetesClient; @@ -48,6 +49,7 @@ public class JobAutoScalerImpl implements JobAutoScaler { private final ScalingMetricCollector metricsCollector; private final ScalingMetricEvaluator evaluator; private final ScalingExecutor scalingExecutor; + private final EventRecorder eventRecorder; private final Map>> lastEvaluatedMetrics = new ConcurrentHashMap<>(); @@ -57,13 +59,13 @@ public JobAutoScalerImpl( KubernetesClient kubernetesClient, ScalingMetricCollector metricsCollector, ScalingMetricEvaluator evaluator, - ScalingExecutor scalingExecutor) { - + ScalingExecutor scalingExecutor, + EventRecorder eventRecorder) { this.kubernetesClient = kubernetesClient; - this.metricsCollector = metricsCollector; this.evaluator = evaluator; this.scalingExecutor = scalingExecutor; + this.eventRecorder = eventRecorder; } @Override @@ -81,17 +83,17 @@ public boolean scale(FlinkResourceContext> var conf = ctx.getObserveConfig(); var resource = ctx.getResource(); - if (resource.getSpec().getJob() == null || !conf.getBoolean(AUTOSCALER_ENABLED)) { - LOG.info("Job autoscaler is disabled"); - return false; - } + try { + if (resource.getSpec().getJob() == null || !conf.getBoolean(AUTOSCALER_ENABLED)) { + LOG.info("Job autoscaler is disabled"); + return false; + } - if (!resource.getStatus().getJobStatus().getState().equals(JobStatus.RUNNING.name())) { - LOG.info("Job autoscaler is waiting for RUNNING job state"); - return false; - } + if (!resource.getStatus().getJobStatus().getState().equals(JobStatus.RUNNING.name())) { + LOG.info("Job autoscaler is waiting for RUNNING job state"); + return false; + } - try { var autoScalerInfo = AutoScalerInfo.forResource(resource, kubernetesClient); var collectedMetrics = @@ -113,8 +115,14 @@ public boolean scale(FlinkResourceContext> scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics); autoScalerInfo.replaceInKubernetes(kubernetesClient); return specAdjusted; - } catch (Exception e) { + } catch (Throwable e) { LOG.error("Error while scaling resource", e); + eventRecorder.triggerEvent( + resource, + EventRecorder.Type.Warning, + EventRecorder.Reason.AutoscalerError, + EventRecorder.Component.Operator, + e.getMessage()); return false; } } diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java index 128381de3c..9ae4aebbd6 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java @@ -36,6 +36,7 @@ public JobAutoScaler create(KubernetesClient kubernetesClient, EventRecorder eve kubernetesClient, new RestApiMetricsCollector(), new ScalingMetricEvaluator(), - new ScalingExecutor(kubernetesClient, eventRecorder)); + new ScalingExecutor(kubernetesClient, eventRecorder), + eventRecorder); } } diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java index 0e4638c647..475013484b 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java @@ -83,6 +83,8 @@ public static void computeDataRateMetrics( scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE, numRecordsInPerSecond); } else { LOG.error("Cannot compute true processing rate without numRecordsInPerSecond"); + scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.NaN); + scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE, Double.NaN); } } diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java index e11f171d6c..7df42c6a0c 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java @@ -52,6 +52,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Test for scaling metrics collection logic. */ @EnableKubernetesMockClient(crud = true) @@ -68,6 +69,8 @@ public class BacklogBasedScalingTest extends OperatorTestBase { private JobAutoScalerImpl autoscaler; + private EventCollector eventCollector = new EventCollector(); + @BeforeEach public void setup() { evaluator = new ScalingMetricEvaluator(); @@ -108,7 +111,11 @@ public void setup() { autoscaler = new JobAutoScalerImpl( - kubernetesClient, metricsCollector, evaluator, scalingExecutor); + kubernetesClient, + metricsCollector, + evaluator, + scalingExecutor, + new EventRecorder(kubernetesClient, eventCollector)); // Reset custom window size to default metricsCollector.setTestMetricWindowSize(null); @@ -373,6 +380,20 @@ public void testMetricsPersistedAfterRedeploy() { assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty()); } + @Test + public void testEventOnError() { + // Invalid config + app.getSpec() + .getFlinkConfiguration() + .put("kubernetes.operator.job.autoscaler.enabled", "3"); + autoscaler.scale(getResourceContext(app, createAutoscalerTestContext())); + + var event = eventCollector.events.poll(); + assertTrue(eventCollector.events.isEmpty()); + assertEquals(EventRecorder.Reason.AutoscalerError.toString(), event.getReason()); + assertTrue(event.getMessage().startsWith("Could not parse")); + } + private void redeployJob(Instant now) { app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli())); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index d3a3a51c93..bcf73afba4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -130,6 +130,7 @@ public enum Reason { RecoverDeployment, RestartUnhealthyJob, ScalingReport, - IneffectiveScaling + IneffectiveScaling, + AutoscalerError } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java index c40f324e2f..60fa84704b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java @@ -96,6 +96,16 @@ public void accept(Event event) { Assertions.assertNotNull(event); Assertions.assertEquals(eventConsumed, event); Assertions.assertEquals(2, event.getCount()); + + Assertions.assertTrue( + EventUtils.createOrUpdateEvent( + kubernetesClient, + flinkApp, + EventRecorder.Type.Warning, + reason, + null, + EventRecorder.Component.Operator, + consumer)); } @Test