Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-27665] Optimize event triggering on DeploymentFailedExceptions #220

Merged
merged 1 commit into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import org.apache.flink.kubernetes.operator.utils.StatusHelper;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
Expand Down Expand Up @@ -142,14 +142,13 @@ private void handleDeploymentFailed(FlinkDeployment flinkApp, DeploymentFailedEx
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
ReconciliationUtils.updateForReconciliationError(flinkApp, dfe.getMessage());

// TODO: avoid repeated event
Event event = DeploymentFailedException.asEvent(dfe, flinkApp);
kubernetesClient
.v1()
.events()
.inNamespace(flinkApp.getMetadata().getNamespace())
.create(event);
EventUtils.createOrUpdateEvent(
kubernetesClient,
flinkApp,
EventUtils.Type.Warning,
dfe.getReason(),
dfe.getMessage(),
EventUtils.Component.JobManagerDeployment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,78 +17,34 @@

package org.apache.flink.kubernetes.operator.exception;

import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;

import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;

import java.time.Instant;

/** Exception to signal terminal deployment failure. */
public class DeploymentFailedException extends RuntimeException {
public static final String COMPONENT_JOBMANAGER = "JobManagerDeployment";

public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";

private static final long serialVersionUID = -1070179896083579221L;

public final String component;
public final String type;
public final String reason;
public final String lastTransitionTime;
public final String lastUpdateTime;
private final String reason;

public DeploymentFailedException(String component, DeploymentCondition deployCondition) {
public DeploymentFailedException(DeploymentCondition deployCondition) {
super(deployCondition.getMessage());
this.component = component;
this.type = deployCondition.getType();
this.reason = deployCondition.getReason();
this.lastTransitionTime = deployCondition.getLastTransitionTime();
this.lastUpdateTime = deployCondition.getLastUpdateTime();
}

public DeploymentFailedException(
String component, String type, ContainerStateWaiting stateWaiting) {
public DeploymentFailedException(ContainerStateWaiting stateWaiting) {
super(stateWaiting.getMessage());
this.component = component;
this.type = type;
this.reason = stateWaiting.getReason();
this.lastTransitionTime = null;
this.lastUpdateTime = null;
}

public DeploymentFailedException(String message, String component, String type, String reason) {
public DeploymentFailedException(String message, String reason) {
super(message);
this.component = component;
this.type = type;
this.reason = reason;
this.lastTransitionTime = Instant.now().toString();
this.lastUpdateTime = lastTransitionTime;
}

public static Event asEvent(DeploymentFailedException dfe, FlinkDeployment flinkApp) {
EventBuilder evtb =
new EventBuilder()
.withApiVersion("v1")
.withNewInvolvedObject()
.withKind(flinkApp.getKind())
.withName(flinkApp.getMetadata().getName())
.withNamespace(flinkApp.getMetadata().getNamespace())
.withUid(flinkApp.getMetadata().getUid())
.endInvolvedObject()
.withType(dfe.type)
.withReason(dfe.reason)
.withFirstTimestamp(dfe.lastTransitionTime)
.withLastTimestamp(dfe.lastUpdateTime)
.withMessage(dfe.getMessage())
.withNewMetadata()
.withGenerateName(flinkApp.getMetadata().getName())
.withNamespace(flinkApp.getMetadata().getNamespace())
.endMetadata()
.withNewSource()
.withComponent(dfe.component)
.endSource();
return evtb.build();
public String getReason() {
return reason;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventUtils;

import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
Expand Down Expand Up @@ -196,8 +196,7 @@ private void checkFailedCreate(DeploymentStatus status) {
List<DeploymentCondition> conditions = status.getConditions();
for (DeploymentCondition dc : conditions) {
if ("FailedCreate".equals(dc.getReason()) && "ReplicaFailure".equals(dc.getType())) {
throw new DeploymentFailedException(
DeploymentFailedException.COMPONENT_JOBMANAGER, dc);
throw new DeploymentFailedException(dc);
}
}
}
Expand All @@ -210,8 +209,7 @@ private void checkCrashLoopBackoff(FlinkDeployment flinkApp, Configuration effec
if (csw != null
&& DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF.equals(
csw.getReason())) {
throw new DeploymentFailedException(
DeploymentFailedException.COMPONENT_JOBMANAGER, "Warning", csw);
throw new DeploymentFailedException(csw);
}
}
}
Expand Down Expand Up @@ -250,20 +248,14 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) {
private void onMissingDeployment(FlinkDeployment deployment) {
String err = "Missing JobManager deployment";
logger.error(err);
Event event =
DeploymentFailedException.asEvent(
new DeploymentFailedException(
err,
DeploymentFailedException.COMPONENT_JOBMANAGER,
"Error",
"Missing"),
deployment);
kubernetesClient
.v1()
.events()
.inNamespace(deployment.getMetadata().getNamespace())
.create(event);
ReconciliationUtils.updateForReconciliationError(deployment, err);
EventUtils.createOrUpdateEvent(
kubernetesClient,
deployment,
EventUtils.Type.Warning,
"Missing",
err,
EventUtils.Component.JobManagerDeployment);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ private Optional<UpgradeMode> getAvailableUpgradeMode(FlinkDeployment deployment
"JobManager deployment is missing and HA data is not available to make stateful upgrades. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ "Manual restore required.",
DeploymentFailedException.COMPONENT_JOBMANAGER,
"Error",
"UpgradeFailed");
} else {
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ private void validateHaMetadataExists(Configuration conf) {
"HA metadata not available to restore from last state. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ "Manual restore required.",
DeploymentFailedException.COMPONENT_JOBMANAGER,
"Error",
"RestoreFailed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public enum Type {

/** The component of events. */
public enum Component {
Operator
Operator,
JobManagerDeployment
}

public static String generateEventName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void clear() {
public void submitApplicationCluster(
JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
if (deployFailure) {
throw new DeploymentFailedException("Deployment failure", "test", "test", "test");
throw new DeploymentFailedException("Deployment failure", "test");
}
if (!jobs.isEmpty()) {
throw new Exception("Cannot submit 2 application clusters at the same time");
Expand Down