Skip to content

Commit

Permalink
[MINOR] fix: Add method close for ApplicationManager (#704)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add  method `close` for ApplicationManager

### Why are the changes needed?
Clean resources while testing. It will improve the stability of our test.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
GA passed
  • Loading branch information
jerqi committed Mar 10, 2023
1 parent af21f0d commit 4072507
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.uniffle.coordinator;

import java.io.Closeable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
Expand Down Expand Up @@ -46,7 +47,7 @@
import org.apache.uniffle.coordinator.strategy.storage.SelectStorageStrategy;
import org.apache.uniffle.coordinator.util.CoordinatorUtils;

public class ApplicationManager {
public class ApplicationManager implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(ApplicationManager.class);
// TODO: Add anomaly detection for other storage
Expand All @@ -62,6 +63,7 @@ public class ApplicationManager {
private final Map<String, String> remoteStorageToHost = Maps.newConcurrentMap();
private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
private final ScheduledExecutorService detectStorageScheduler;
private final ScheduledExecutorService checkAppScheduler;
private Map<String, Map<String, Long>> currentUserAndApp = Maps.newConcurrentMap();
private Map<String, String> appIdToUser = Maps.newConcurrentMap();
private QuotaManager quotaManager;
Expand Down Expand Up @@ -93,9 +95,9 @@ public ApplicationManager(CoordinatorConf conf) {
}
}
// the thread for checking application status
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
checkAppScheduler = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.getThreadFactory("ApplicationManager-%d"));
scheduledExecutorService.scheduleAtFixedRate(
checkAppScheduler.scheduleAtFixedRate(
this::statusCheck, expired / 2, expired / 2, TimeUnit.MILLISECONDS);
// the thread for checking if the storage is normal
detectStorageScheduler = Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -350,6 +352,15 @@ public static List<String> getPathSchema() {
return REMOTE_PATH_SCHEMA;
}

public void close() {
if (detectStorageScheduler != null) {
detectStorageScheduler.shutdownNow();
}
if (checkAppScheduler != null) {
checkAppScheduler.shutdownNow();
}
}

public enum StrategyName {
APP_BALANCE,
IO_SAMPLE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ public void stopServer() throws Exception {
if (jettyServer != null) {
jettyServer.stop();
}
if (applicationManager != null) {
applicationManager.close();
}
if (clusterManager != null) {
clusterManager.close();
}
Expand Down

0 comments on commit 4072507

Please sign in to comment.