Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-27675] Improve manual savepoint tracking #225

Merged
merged 1 commit into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -51,9 +52,11 @@ public void setTrigger(String triggerId) {
this.triggerTimestamp = System.currentTimeMillis();
}

public void resetTrigger() {
public boolean resetTrigger() {
boolean reseted = StringUtils.isNotEmpty(this.triggerId);
this.triggerId = "";
this.triggerTimestamp = 0L;
return reseted;
}

public void updateLastSavepoint(Savepoint savepoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@
@Value
public class SavepointFetchResult {
Savepoint savepoint;
boolean isTriggered;
boolean pending;
String error;

public static SavepointFetchResult notTriggered() {
return new SavepointFetchResult(null, false, null);
}

public static SavepointFetchResult error(String error) {
return new SavepointFetchResult(null, false, error);
}
Expand All @@ -41,6 +37,6 @@ public static SavepointFetchResult pending() {
}

public static SavepointFetchResult completed(Savepoint savepoint) {
return new SavepointFetchResult(savepoint, true, null);
return new SavepointFetchResult(savepoint, false, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusHelper;

Expand Down Expand Up @@ -70,7 +70,18 @@ public void observeSavepointStatus(
.orElse(null);

observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
.ifPresent(err -> ReconciliationUtils.updateForReconciliationError(resource, err));
.ifPresent(
err ->
EventUtils.createOrUpdateEvent(
flinkService.getKubernetesClient(),
resource,
EventUtils.Type.Warning,
"SavepointError",
"Savepoint failed for savepointTriggerNonce: "
+ resource.getSpec()
.getJob()
.getSavepointTriggerNonce(),
EventUtils.Component.Operator));

// We only need to observe latest checkpoint/savepoint for terminal jobs
if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
Expand Down Expand Up @@ -106,36 +117,36 @@ private Optional<String> observeTriggeredSavepointProgress(
LOG.debug("Savepoint not in progress");
return Optional.empty();
}
LOG.info("Observing savepoint status");
SavepointFetchResult savepointFetchResult;
try {
savepointFetchResult =
flinkService.fetchSavepointInfo(
currentSavepointInfo.getTriggerId(), jobID, deployedConfig);
} catch (Exception e) {
LOG.error("Exception while fetching savepoint info", e);
return Optional.empty();
}

if (!savepointFetchResult.isTriggered()) {
String error = savepointFetchResult.getError();
if (error != null
|| SavepointUtils.gracePeriodEnded(
configManager.getOperatorConfiguration(), currentSavepointInfo)) {
String errorMsg = error != null ? error : "Savepoint status unknown";
LOG.error(errorMsg);
LOG.info("Observing savepoint status.");
SavepointFetchResult savepointFetchResult =
flinkService.fetchSavepointInfo(
currentSavepointInfo.getTriggerId(), jobID, deployedConfig);

if (savepointFetchResult.isPending()) {
if (SavepointUtils.gracePeriodEnded(
configManager.getOperatorConfiguration(), currentSavepointInfo)) {
String errorMsg =
"Savepoint operation timed out after "
+ configManager
.getOperatorConfiguration()
.getSavepointTriggerGracePeriod();
currentSavepointInfo.resetTrigger();
LOG.error(errorMsg);
return Optional.of(errorMsg);
} else {
LOG.info("Savepoint operation not finished yet, waiting within grace period...");
return Optional.empty();
}
LOG.info("Savepoint operation not running, waiting within grace period...");
}
if (savepointFetchResult.getSavepoint() == null) {
LOG.info("Savepoint is still in progress...");
return Optional.empty();

if (savepointFetchResult.getError() != null) {
currentSavepointInfo.resetTrigger();
return Optional.of(savepointFetchResult.getError());
}

LOG.info("Savepoint status updated with latest completed savepoint info");
currentSavepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint());

updateSavepointHistory(
currentSavepointInfo, savepointFetchResult.getSavepoint(), deployedConfig);
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ public void observe(FlinkDeployment flinkApp, Context context) {
}
}

if (!ReconciliationUtils.isJobRunning(flinkApp.getStatus())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might also need to create an event when we canceling the savepoint operation. At least, we need to add a log when doing a concrete reset(when the triggerId is not empty).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, PTAL

if (flinkApp.getStatus().getJobStatus().getSavepointInfo().resetTrigger()) {
logger.error("Job is not running, cancelling savepoint operation");
EventUtils.createOrUpdateEvent(
flinkService.getKubernetesClient(),
flinkApp,
EventUtils.Type.Warning,
"SavepointError",
"Savepoint failed for savepointTriggerNonce: "
+ flinkApp.getSpec().getJob().getSavepointTriggerNonce(),
EventUtils.Component.Operator);
}
}

clearErrorsIfDeploymentIsHealthy(flinkApp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public FlinkService(KubernetesClient kubernetesClient, FlinkConfigManager config
4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
}

public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}

public void submitApplicationCluster(
JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
LOG.info(
Expand Down Expand Up @@ -639,7 +643,7 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) th
}

public SavepointFetchResult fetchSavepointInfo(
String triggerId, String jobId, Configuration conf) throws Exception {
String triggerId, String jobId, Configuration conf) {
LOG.info("Fetching savepoint result with triggerId: " + triggerId);
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
Expand All @@ -656,24 +660,28 @@ public SavepointFetchResult fetchSavepointInfo(
EmptyRequestBody.getInstance());

if (response.get() == null || response.get().resource() == null) {
return SavepointFetchResult.notTriggered();
return SavepointFetchResult.pending();
}

if (response.get().resource().getLocation() == null) {
if (response.get().resource().getFailureCause() != null) {
LOG.error("Savepoint error", response.get().resource().getFailureCause());
LOG.error(
"Failure occurred while fetching the savepoint result",
response.get().resource().getFailureCause());
return SavepointFetchResult.error(
response.get().resource().getFailureCause().getMessage());
response.get().resource().getFailureCause().toString());
} else {
return SavepointFetchResult.pending();
}
}

Savepoint savepoint =
new Savepoint(
System.currentTimeMillis(), response.get().resource().getLocation());
LOG.info("Savepoint result: " + savepoint);
return SavepointFetchResult.completed(savepoint);
} catch (Exception e) {
LOG.error("Exception while fetching the savepoint result", e);
return SavepointFetchResult.error(e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;

Expand All @@ -68,9 +69,10 @@ public class TestingFlinkService extends FlinkService {

public static final Map<String, String> CLUSTER_INFO =
Map.of(
DashboardConfiguration.FIELD_NAME_FLINK_VERSION, "15.0.0",
DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
"15.0.0",
DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
"1234567 @ 1970-01-01T00:00:00+00:00");
"1234567 @ 1970-01-01T00:00:00+00:00");

private int savepointCounter = 0;
private int triggerCounter = 0;
Expand All @@ -84,11 +86,16 @@ public class TestingFlinkService extends FlinkService {
private PodList podList = new PodList();
private Consumer<Configuration> listJobConsumer = conf -> {};
private List<String> disposedSavepoints = new ArrayList<>();
private SavepointFetchResult savepointFetchResult;

public TestingFlinkService() {
super(null, new FlinkConfigManager(new Configuration()));
}

public TestingFlinkService(KubernetesClient kubernetesClient) {
super(kubernetesClient, new FlinkConfigManager(new Configuration()));
}

public Context getContext() {
return new Context() {
@Override
Expand Down Expand Up @@ -297,6 +304,17 @@ protected void waitForClusterShutdown(Configuration conf) {}
@Override
public SavepointFetchResult fetchSavepointInfo(
String triggerId, String jobId, Configuration conf) {

if (savepointFetchResult == null) {
savepointFetchResult = SavepointFetchResult.pending();
return savepointFetchResult;
}

if (savepointFetchResult.isPending()) {
savepointFetchResult = SavepointFetchResult.error("Failed");
return savepointFetchResult;
}

return SavepointFetchResult.completed(Savepoint.of("savepoint_" + savepointCounter++));
}

Expand Down