diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 5306d6f0e65e3..0aa9dfc6af712 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -574,11 +574,11 @@ private CompletableFuture removeJob(JobID jobId, boolean cleanupHA) { return jobManagerRunnerTerminationFuture.thenRunAsync( () -> { jobManagerMetricGroup.removeJob(jobId); - blobServer.cleanupJob(jobId, cleanupHA); if (cleanupHA) { try { submittedJobGraphStore.removeJobGraph(jobId); + blobServer.cleanupJob(jobId, cleanupHA); } catch (Exception e) { log.warn("Could not properly remove job {} from submitted job graph store.", jobId); } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index cebff5881ac6a..441f9487c7fd9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1734,6 +1734,8 @@ class JobManager( // and the ZooKeeper client is closed. Not removing the job immediately allow the // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) + val result = blobServer.cleanupJob(jobID, removeJobFromStateBackend) + } catch { case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t) } @@ -1759,10 +1761,7 @@ class JobManager( case None => None } - // remove all job-related BLOBs from local and HA store libraryCacheManager.unregisterJob(jobID) - blobServer.cleanupJob(jobID, removeJobFromStateBackend) - jobManagerMetricGroup.removeJob(jobID) futureOption diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 745e9cb664eb2..ac4f1a862c591 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; @@ -72,6 +73,7 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; @@ -83,6 +85,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collection; @@ -125,6 +128,9 @@ public class DispatcherTest extends TestLogger { @Rule public TestName name = new TestName(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private JobGraph jobGraph; private TestingFatalErrorHandler fatalErrorHandler; @@ -294,6 +300,30 @@ public void testSubmittedJobGraphListener() throws Exception { assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), hasSize(1)); } + @Test + public void testBlobsAreRemovedOnlyIfJobIsRemovedProperly() throws Exception { + dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get(); + PermanentBlobKey key = blobServer.putPermanent(TEST_JOB_ID, new byte[128]); + submittedJobGraphStore.setRemovalFailure(new Exception("Failed to Remove future")); + final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder() + .setJobID(TEST_JOB_ID) + .setState(JobStatus.CANCELED) + .build(); + + dispatcher.completeJobExecution(executionGraph); + //Assert that blob was not removed, since exception was thrown while removing the job + assertThat(blobServer.getFile(TEST_JOB_ID, key), notNullValue(File.class)); + submittedJobGraphStore.setRemovalFailure(null); + dispatcher.completeJobExecution(executionGraph); + + //Job removing did not throw exception now, blob should be null + expectedException.expect(NoSuchFileException.class); + blobServer.getFile(TEST_JOB_ID, key); + } + @Test public void testOnAddedJobGraphRecoveryFailure() throws Exception { final FlinkException expectedFailure = new FlinkException("Expected failure"); @@ -619,10 +649,17 @@ private static final class FailableSubmittedJobGraphStore extends InMemorySubmit @Nullable private Exception recoveryFailure = null; + @Nullable + private Exception removalFailure = null; + void setRecoveryFailure(@Nullable Exception recoveryFailure) { this.recoveryFailure = recoveryFailure; } + void setRemovalFailure(@Nullable Exception removalFailure) { + this.removalFailure = removalFailure; + } + @Override public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { if (recoveryFailure != null) { @@ -631,5 +668,14 @@ public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Except return super.recoverJobGraph(jobId); } } + + @Override + public synchronized void removeJobGraph(JobID jobId) throws Exception { + if (removalFailure != null) { + throw removalFailure; + } else { + super.removeJobGraph(jobId); + } + } } }