Skip to content

Commit

Permalink
[HUDI-2959] Fix the thread leak of cleaning service
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Dec 9, 2021
1 parent bd08470 commit 41efa31
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -130,7 +129,7 @@ public void start(Function<Boolean, Boolean> onShutdownCallback) {
future = res.getKey();
executor = res.getValue();
started = true;
monitorThreads(onShutdownCallback);
shutdownCallback(onShutdownCallback);
}

/**
Expand All @@ -141,34 +140,15 @@ public void start(Function<Boolean, Boolean> onShutdownCallback) {
protected abstract Pair<CompletableFuture, ExecutorService> startService();

/**
* A monitor thread is started which would trigger a callback if the service is shutdown.
* Add shutdown callback for the completable future.
*
* @param onShutdownCallback
* @param callback The callback
*/
private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
LOG.info("Submitting monitor thread !!");
Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "Monitor Thread");
t.setDaemon(isRunInDaemonMode());
return t;
}).submit(() -> {
boolean error = false;
try {
LOG.info("Monitoring thread(s) !!");
future.get();
} catch (ExecutionException ex) {
LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
error = true;
} catch (InterruptedException ie) {
LOG.error("Got interrupted Monitoring threads", ie);
error = true;
} finally {
// Mark as shutdown
shutdown = true;
if (null != onShutdownCallback) {
onShutdownCallback.apply(error);
}
shutdown(false);
@SuppressWarnings("unchecked")
private void shutdownCallback(Function<Boolean, Boolean> callback) {
future.whenComplete((resp, error) -> {
if (null != callback) {
callback.apply(null != error);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,11 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
if (null == this.asyncCleanerService) {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
} else {
this.asyncCleanerService.start(null);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,26 @@ class AsyncCleanerService extends HoodieAsyncService {
private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);

private final AbstractHoodieWriteClient writeClient;
private final String cleanInstantTime;
private final transient ExecutorService executor = Executors.newSingleThreadExecutor();

protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String cleanInstantTime) {
protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) {
this.writeClient = writeClient;
this.cleanInstantTime = cleanInstantTime;
}

@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
return Pair.of(CompletableFuture.supplyAsync(() -> {
writeClient.clean(cleanInstantTime);
writeClient.clean(instantTime);
return true;
}), executor);
}, executor), executor);
}

public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) {
AsyncCleanerService asyncCleanerService = null;
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
asyncCleanerService = new AsyncCleanerService(writeClient);
asyncCleanerService.start(null);
} else {
LOG.info("Async auto cleaning is not enabled. Not running cleaner now");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,11 @@ public void initMetadataWriter() {
* checkpoint finish.
*/
public void startAsyncCleaning() {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
if (this.asyncCleanerService == null) {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
} else {
this.asyncCleanerService.start(null);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,11 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
public void initializeState(FunctionInitializationContext context) throws Exception {
// no operation
}

@Override
public void close() throws Exception {
if (this.writeClient != null) {
this.writeClient.close();
}
}
}

0 comments on commit 41efa31

Please sign in to comment.