Skip to content

Commit

Permalink
[FLINK-32445][runtime] Refactors the closeAndCleanupAllData methods i…
Browse files Browse the repository at this point in the history
…n BlobStoreService and HighAvailabilityServices (#23424)
  • Loading branch information
Jiabao-Sun committed Oct 5, 2023
1 parent 1a446ca commit cd95b56
Show file tree
Hide file tree
Showing 30 changed files with 110 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ public void testBlobServerRecovery() throws Exception {
TestingBlobHelpers.testBlobServerRecovery(
config, blobStoreService, temporaryFolder.newFolder());
} finally {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}

Expand All @@ -238,7 +239,8 @@ public void testBlobServerCorruptedFile() throws Exception {
TestingBlobHelpers.testGetFailsFromCorruptFile(
config, blobStoreService, temporaryFolder.newFolder());
} finally {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}

Expand All @@ -261,7 +263,8 @@ public void testBlobCacheRecovery() throws Exception {
TestingBlobHelpers.testBlobCacheRecovery(
config, blobStoreService, temporaryFolder.newFolder());
} finally {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}

Expand All @@ -284,7 +287,8 @@ public void testBlobCacheCorruptedFile() throws Exception {
TestingBlobHelpers.testGetFailsFromCorruptFile(
new JobID(), config, blobStoreService, temporaryFolder.newFolder());
} finally {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
/** Service interface for the BlobStore which allows to close and clean up its data. */
public interface BlobStoreService extends BlobStore, Closeable {

/** Closes and cleans up the store. This entails the deletion of all blobs. */
void closeAndCleanupAllData();
/** Cleans up the store. This entails the deletion of all blobs. */
void cleanupAllData();
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private boolean delete(String blobPath) {
}

@Override
public void closeAndCleanupAllData() {
public void cleanupAllData() {
try {
LOG.debug("Cleaning up {}.", basePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public boolean deleteAll(JobID jobId) {
}

@Override
public void closeAndCleanupAllData() {}
public void cleanupAllData() {}

@Override
public void close() throws IOException {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,11 +499,7 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {

if (haServices != null) {
try {
if (cleanupHaData) {
haServices.closeAndCleanupAllData();
} else {
haServices.close();
}
haServices.closeWithOptionalClean(cleanupHaData);
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
* #getLeaderPathForRestServer}. The returned leader name is the ConfigMap name in Kubernetes and
* child path in Zookeeper.
*
* <p>{@link #close()} and {@link #closeAndCleanupAllData()} should be implemented to destroy the
* resources.
* <p>{@link #close()} and {@link #cleanupAllData()} should be implemented to destroy the resources.
*
* <p>The abstract class is also responsible for determining which component service should be
* reused. For example, {@link #jobResultStore} is created once and could be reused many times.
Expand Down Expand Up @@ -182,8 +181,8 @@ public void close() throws Exception {
}

@Override
public void closeAndCleanupAllData() throws Exception {
logger.info("Close and clean up all data for {}.", getClass().getSimpleName());
public void cleanupAllData() throws Exception {
logger.info("Clean up all data for {}.", getClass().getSimpleName());

Throwable exception = null;

Expand All @@ -192,40 +191,20 @@ public void closeAndCleanupAllData() throws Exception {
try {
internalCleanup();
deletedHAData = true;
blobStoreService.cleanupAllData();
} catch (Exception t) {
exception = t;
}

try {
if (leaderElectionService != null) {
leaderElectionService.close();
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

try {
internalClose();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

try {
if (deletedHAData) {
blobStoreService.closeAndCleanupAllData();
} else {
logger.info(
"Cannot delete HA blobs because we failed to delete the pointers in the HA store.");
blobStoreService.close();
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
if (!deletedHAData) {
logger.info(
"Cannot delete HA blobs because we failed to delete the pointers in the HA store.");
}

if (exception != null) {
ExceptionUtils.rethrowException(
exception,
"Could not properly close and clean up all data of high availability service.");
"Could not properly clean up all data of high availability service.");
}
logger.info("Finished cleaning up the high availability data.");
}
Expand Down Expand Up @@ -281,8 +260,7 @@ public CompletableFuture<Void> globalCleanupAsync(JobID jobID, Executor executor
* Clean up the meta data in the distributed system(e.g. Zookeeper, Kubernetes ConfigMap).
*
* <p>If an exception occurs during internal cleanup, we will continue the cleanup in {@link
* #closeAndCleanupAllData} and report exceptions only after all cleanup steps have been
* attempted.
* #cleanupAllData} and report exceptions only after all cleanup steps have been attempted.
*
* @throws Exception when do the cleanup operation on external storage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.FutureUtils;

import java.io.IOException;
Expand Down Expand Up @@ -212,19 +213,42 @@ default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
void close() throws Exception;

/**
* Closes the high availability services (releasing all resources) and deletes all data stored
* by these services in external stores.
* Deletes all data stored by high availability services in external stores.
*
* <p>After this method was called, the any job or session that was managed by these high
* <p>After this method was called, any job or session that was managed by these high
* availability services will be unrecoverable.
*
* <p>If an exception occurs during cleanup, this method will attempt to continue the cleanup
* and report exceptions only after all cleanup steps have been attempted.
*
* @throws Exception Thrown, if an exception occurred while closing these services or cleaning
* up data stored by them.
* @throws Exception if an error occurred while cleaning up data stored by them.
*/
void closeAndCleanupAllData() throws Exception;
void cleanupAllData() throws Exception;

/**
* Calls {@link #cleanupAllData()} (if {@code true} is passed as a parameter) before calling
* {@link #close()} on this instance. Any error that appeared during the cleanup will be
* propagated after calling {@code close()}.
*/
default void closeWithOptionalClean(boolean cleanupData) throws Exception {
Throwable exception = null;
if (cleanupData) {
try {
cleanupAllData();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
try {
close();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

if (exception != null) {
ExceptionUtils.rethrowException(exception);
}
}

@Override
default CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ public void close() throws Exception {
}

@Override
public void closeAndCleanupAllData() throws Exception {
// this stores no data, so this method is the same as 'close()'
close();
public void cleanupAllData() throws Exception {
// this stores no data, do nothing here
}

// ----------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1276,15 +1276,7 @@ private void terminateMiniClusterServices(boolean cleanupHaData) throws Exceptio

// shut down high-availability services
if (haServices != null) {
try {
if (cleanupHaData) {
haServices.closeAndCleanupAllData();
} else {
haServices.close();
}
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
haServices.closeWithOptionalClean(cleanupHaData);
haServices = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ private void testGetFailsFromCorruptFile(
TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ void testBlobCacheRecovery() throws Exception {
config, blobStoreService, TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ void testBlobFetchRetriesHa() throws IOException {
testBlobFetchRetries(blobStoreService, new JobID(), PERMANENT_BLOB);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}
}
Expand Down Expand Up @@ -148,7 +149,8 @@ void testBlobForJobFetchWithTooManyFailuresHa() throws IOException {
testBlobFetchWithTooManyFailures(blobStoreService, new JobID(), PERMANENT_BLOB);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ private void uploadFileGetTest(
}
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ void testGetFailsFromCorruptFile() throws IOException {
config, blobStoreService, TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ void testGetFailsIncomingForJobHa() throws IOException {
}
} finally {
if (blobStore != null) {
blobStore.closeAndCleanupAllData();
blobStore.cleanupAllData();
blobStore.close();
}
}
}
Expand Down Expand Up @@ -269,7 +270,8 @@ void testGetFailsStoreForJobHa() throws IOException {
}
} finally {
if (blobStore != null) {
blobStore.closeAndCleanupAllData();
blobStore.cleanupAllData();
blobStore.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public void testBlobServerRecovery() throws Exception {
config, blobStoreService, TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ protected TestingDispatcher.Builder createTestingDispatcherBuilder() {
@After
public void tearDown() throws Exception {
if (haServices != null) {
haServices.closeAndCleanupAllData();
haServices.closeWithOptionalClean(true);
}
if (blobServer != null) {
blobServer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ public void testClusterStartShouldObtainTokens() throws Exception {
@Test
public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
final CompletableFuture<Void> closeAndCleanupAllDataFuture = new CompletableFuture<>();
final CompletableFuture<Void> cleanupAllDataFuture = new CompletableFuture<>();
final HighAvailabilityServices testingHaService =
new TestingHighAvailabilityServicesBuilder()
.setCloseFuture(closeFuture)
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
.setCleanupAllDataFuture(cleanupAllDataFuture)
.build();
final TestingEntryPoint testingEntryPoint =
new TestingEntryPoint.Builder()
Expand All @@ -154,7 +154,7 @@ public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
is(ApplicationStatus.UNKNOWN));
assertThat(closeFuture.isDone(), is(true));
assertThat(closeAndCleanupAllDataFuture.isDone(), is(false));
assertThat(cleanupAllDataFuture.isDone(), is(false));
}

@Test
Expand Down Expand Up @@ -184,13 +184,13 @@ public void testCloseAsyncShouldNotDeregisterApp() throws Exception {
@Test
public void testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws Exception {
final CompletableFuture<Void> deregisterFuture = new CompletableFuture<>();
final CompletableFuture<Void> closeAndCleanupAllDataFuture = new CompletableFuture<>();
final CompletableFuture<Void> cleanupAllDataFuture = new CompletableFuture<>();
final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture =
new CompletableFuture<>();

final HighAvailabilityServices testingHaService =
new TestingHighAvailabilityServicesBuilder()
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
.setCleanupAllDataFuture(cleanupAllDataFuture)
.build();
final TestingResourceManagerFactory testingResourceManagerFactory =
new TestingResourceManagerFactory.Builder()
Expand Down Expand Up @@ -221,7 +221,7 @@ public void testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() thr
appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
is(ApplicationStatus.SUCCEEDED));
assertThat(deregisterFuture.isDone(), is(true));
assertThat(closeAndCleanupAllDataFuture.isDone(), is(true));
assertThat(cleanupAllDataFuture.isDone(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public void testRecoveryRegisterAndDownload() throws Exception {
}

if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
blobStoreService.cleanupAllData();
blobStoreService.close();
}
}
}
Expand Down
Loading

0 comments on commit cd95b56

Please sign in to comment.