From b946c5e569eeabf90666bbda9c458bca71029a7f Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 21 Jun 2018 15:24:40 +0200 Subject: [PATCH] [hotfix] Make ActorSystemLoader in ClusterClient configurable --- .../flink/client/program/ClusterClient.java | 48 ++++++++++++++++--- .../program/StandaloneClusterClient.java | 4 ++ .../apache/flink/yarn/YarnClusterClient.java | 4 +- 3 files changed, 48 insertions(+), 8 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index c082b10f1c8bf..a157d34ccfc81 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -104,8 +104,8 @@ public abstract class ClusterClient { /** The optimizer used in the optimization of batch programs. */ final Optimizer compiler; - /** The actor system used to communicate with the JobManager. Lazily initialized upon first use */ - protected final LazyActorSystemLoader actorSystemLoader; + /** The actor system used to communicate with the JobManager. */ + protected final ActorSystemLoader actorSystemLoader; /** Configuration of the client. */ protected final Configuration flinkConfig; @@ -171,7 +171,10 @@ public ClusterClient(Configuration flinkConfig) throws Exception { * @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval * @param sharedHaServices true if the HighAvailabilityServices are shared and must not be shut down */ - public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices) { + public ClusterClient( + Configuration flinkConfig, + HighAvailabilityServices highAvailabilityServices, + boolean sharedHaServices) { this.flinkConfig = Preconditions.checkNotNull(flinkConfig); this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig); @@ -188,14 +191,45 @@ public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAva this.sharedHaServices = sharedHaServices; } + public ClusterClient( + Configuration flinkConfig, + HighAvailabilityServices highAvailabilityServices, + boolean sharedHaServices, + ActorSystemLoader actorSystemLoader) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig); + + this.timeout = AkkaUtils.getClientTimeout(flinkConfig); + this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig); + + this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader); + + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); + this.sharedHaServices = sharedHaServices; + } + // ------------------------------------------------------------------------ // Startup & Shutdown // ------------------------------------------------------------------------ + /** + * Interface which allows to load an {@link ActorSystem}. + */ + public interface ActorSystemLoader extends AutoCloseable { + + /** + * Get an {@link ActorSystem}. + * + * @return {@link ActorSystem} + * @throws FlinkException + */ + ActorSystem get() throws FlinkException; + } + /** * Utility class to lazily instantiate an {@link ActorSystem}. */ - protected static class LazyActorSystemLoader { + protected static class LazyActorSystemLoader implements ActorSystemLoader { private final Logger log; @@ -226,7 +260,8 @@ public boolean isLoaded() { return actorSystem != null; } - public void shutdown() { + @Override + public void close() throws Exception { if (isLoaded()) { actorSystem.shutdown(); actorSystem.awaitTermination(); @@ -239,6 +274,7 @@ public void shutdown() { * @return ActorSystem * @throws Exception if the ActorSystem could not be created */ + @Override public ActorSystem get() throws FlinkException { if (!isLoaded()) { @@ -276,7 +312,7 @@ public ActorSystem get() throws FlinkException { */ public void shutdown() throws Exception { synchronized (this) { - actorSystemLoader.shutdown(); + actorSystemLoader.close(); if (!sharedHaServices && highAvailabilityServices != null) { highAvailabilityServices.close(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index e502add468c76..caee34f4b5e38 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -51,6 +51,10 @@ public StandaloneClusterClient(Configuration config, HighAvailabilityServices hi super(config, highAvailabilityServices, sharedHaServices); } + public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices, ActorSystemLoader actorSystemLoader) { + super(config, highAvailabilityServices, sharedHaServices, actorSystemLoader); + } + @Override public void waitForClusterToBeReady() {} diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index 0d7546e96f901..a5aca5dddb183 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -285,14 +285,14 @@ public ApplicationId getApplicationId() { private static class LazApplicationClientLoader { private final Configuration flinkConfig; - private final LazyActorSystemLoader actorSystemLoader; + private final ActorSystemLoader actorSystemLoader; private final HighAvailabilityServices highAvailabilityServices; private ActorRef applicationClient; private LazApplicationClientLoader( Configuration flinkConfig, - LazyActorSystemLoader actorSystemLoader, + ActorSystemLoader actorSystemLoader, HighAvailabilityServices highAvailabilityServices) { this.flinkConfig = Preconditions.checkNotNull(flinkConfig, "flinkConfig"); this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader, "actorSystemLoader");