Skip to content

Commit

Permalink
[FLINK-12057] Refactor MemoryLogger to accept termination future inst…
Browse files Browse the repository at this point in the history
…ead of ActorSystem
  • Loading branch information
zentol committed Mar 29, 2019
1 parent 72a6f3f commit 41769a0
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 13 deletions.
Expand Up @@ -159,7 +159,7 @@ public TaskManagerRunner(Configuration configuration, ResourceID resourceId) thr
this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;

MemoryLogger.startIfConfigured(LOG, configuration, metricQueryServiceActorSystem);
MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
}

// --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;

import akka.actor.ActorSystem;
import org.slf4j.Logger;

import javax.management.MBeanServer;
Expand All @@ -34,6 +33,7 @@
import java.lang.management.MemoryType;
import java.lang.management.MemoryUsage;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* A thread the periodically logs statistics about:
Expand All @@ -57,14 +57,14 @@ public class MemoryLogger extends Thread {

private final BufferPoolMXBean directBufferBean;

private final ActorSystem monitored;
private final CompletableFuture<Void> monitored;

private volatile boolean running = true;

public static void startIfConfigured(
Logger logger,
Configuration configuration,
ActorSystem taskManagerSystem) {
CompletableFuture<Void> taskManagerTerminationFuture) {
if (!logger.isInfoEnabled() || !configuration.getBoolean(TaskManagerOptions.DEBUG_MEMORY_LOG)) {
return;
}
Expand All @@ -73,19 +73,19 @@ public static void startIfConfigured(
new MemoryLogger(
logger,
configuration.getLong(TaskManagerOptions.DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS),
taskManagerSystem).start();
taskManagerTerminationFuture).start();
}

/**
* Creates a new memory logger that logs in the given interval and lives as long as the
* given actor system.
* Creates a new memory logger that logs in the given interval and lives until the
* given termination future completes.
*
* @param logger The logger to use for outputting the memory statistics.
* @param interval The interval in which the thread logs.
* @param monitored The actor system to whose life the thread is bound. The thread terminates
* once the actor system terminates.
* @param monitored termination future for the system to whose life the thread is bound. The thread terminates
* once the system terminates.
*/
public MemoryLogger(Logger logger, long interval, ActorSystem monitored) {
public MemoryLogger(Logger logger, long interval, CompletableFuture<Void> monitored) {
super("Memory Logger");
setDaemon(true);
setPriority(Thread.MIN_PRIORITY);
Expand Down Expand Up @@ -125,7 +125,7 @@ public void shutdown() {
@Override
public void run() {
try {
while (running && (monitored == null || !monitored.whenTerminated().isCompleted())) {
while (running && (monitored == null || !monitored.isDone())) {
logger.info(getMemoryUsageStatsAsString(memoryBean));
logger.info(getDirectMemoryStatsAsString(directBufferBean));
logger.info(getMemoryPoolStatsAsString(poolBeans));
Expand Down
Expand Up @@ -1885,8 +1885,6 @@ object TaskManager {
)
}

MemoryLogger.startIfConfigured(LOG.logger, configuration, taskManagerSystem)

// block until everything is done
Await.ready(taskManagerSystem.whenTerminated, Duration.Inf)
} catch {
Expand Down

0 comments on commit 41769a0

Please sign in to comment.