Skip to content

Commit

Permalink
[hotfix] Make ActorSystemLoader in ClusterClient configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Jun 22, 2018
1 parent e64d4db commit b946c5e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 8 deletions.
Expand Up @@ -104,8 +104,8 @@ public abstract class ClusterClient<T> {
/** The optimizer used in the optimization of batch programs. */ /** The optimizer used in the optimization of batch programs. */
final Optimizer compiler; final Optimizer compiler;


/** The actor system used to communicate with the JobManager. Lazily initialized upon first use */ /** The actor system used to communicate with the JobManager. */
protected final LazyActorSystemLoader actorSystemLoader; protected final ActorSystemLoader actorSystemLoader;


/** Configuration of the client. */ /** Configuration of the client. */
protected final Configuration flinkConfig; protected final Configuration flinkConfig;
Expand Down Expand Up @@ -171,7 +171,10 @@ public ClusterClient(Configuration flinkConfig) throws Exception {
* @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval * @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
* @param sharedHaServices true if the HighAvailabilityServices are shared and must not be shut down * @param sharedHaServices true if the HighAvailabilityServices are shared and must not be shut down
*/ */
public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices) { public ClusterClient(
Configuration flinkConfig,
HighAvailabilityServices highAvailabilityServices,
boolean sharedHaServices) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig); this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig); this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);


Expand All @@ -188,14 +191,45 @@ public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAva
this.sharedHaServices = sharedHaServices; this.sharedHaServices = sharedHaServices;
} }


public ClusterClient(
Configuration flinkConfig,
HighAvailabilityServices highAvailabilityServices,
boolean sharedHaServices,
ActorSystemLoader actorSystemLoader) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);

this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);

this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader);

this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
this.sharedHaServices = sharedHaServices;
}

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Startup & Shutdown // Startup & Shutdown
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


/**
* Interface which allows to load an {@link ActorSystem}.
*/
public interface ActorSystemLoader extends AutoCloseable {

/**
* Get an {@link ActorSystem}.
*
* @return {@link ActorSystem}
* @throws FlinkException
*/
ActorSystem get() throws FlinkException;
}

/** /**
* Utility class to lazily instantiate an {@link ActorSystem}. * Utility class to lazily instantiate an {@link ActorSystem}.
*/ */
protected static class LazyActorSystemLoader { protected static class LazyActorSystemLoader implements ActorSystemLoader {


private final Logger log; private final Logger log;


Expand Down Expand Up @@ -226,7 +260,8 @@ public boolean isLoaded() {
return actorSystem != null; return actorSystem != null;
} }


public void shutdown() { @Override
public void close() throws Exception {
if (isLoaded()) { if (isLoaded()) {
actorSystem.shutdown(); actorSystem.shutdown();
actorSystem.awaitTermination(); actorSystem.awaitTermination();
Expand All @@ -239,6 +274,7 @@ public void shutdown() {
* @return ActorSystem * @return ActorSystem
* @throws Exception if the ActorSystem could not be created * @throws Exception if the ActorSystem could not be created
*/ */
@Override
public ActorSystem get() throws FlinkException { public ActorSystem get() throws FlinkException {


if (!isLoaded()) { if (!isLoaded()) {
Expand Down Expand Up @@ -276,7 +312,7 @@ public ActorSystem get() throws FlinkException {
*/ */
public void shutdown() throws Exception { public void shutdown() throws Exception {
synchronized (this) { synchronized (this) {
actorSystemLoader.shutdown(); actorSystemLoader.close();


if (!sharedHaServices && highAvailabilityServices != null) { if (!sharedHaServices && highAvailabilityServices != null) {
highAvailabilityServices.close(); highAvailabilityServices.close();
Expand Down
Expand Up @@ -51,6 +51,10 @@ public StandaloneClusterClient(Configuration config, HighAvailabilityServices hi
super(config, highAvailabilityServices, sharedHaServices); super(config, highAvailabilityServices, sharedHaServices);
} }


public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices, ActorSystemLoader actorSystemLoader) {
super(config, highAvailabilityServices, sharedHaServices, actorSystemLoader);
}

@Override @Override
public void waitForClusterToBeReady() {} public void waitForClusterToBeReady() {}


Expand Down
Expand Up @@ -285,14 +285,14 @@ public ApplicationId getApplicationId() {
private static class LazApplicationClientLoader { private static class LazApplicationClientLoader {


private final Configuration flinkConfig; private final Configuration flinkConfig;
private final LazyActorSystemLoader actorSystemLoader; private final ActorSystemLoader actorSystemLoader;
private final HighAvailabilityServices highAvailabilityServices; private final HighAvailabilityServices highAvailabilityServices;


private ActorRef applicationClient; private ActorRef applicationClient;


private LazApplicationClientLoader( private LazApplicationClientLoader(
Configuration flinkConfig, Configuration flinkConfig,
LazyActorSystemLoader actorSystemLoader, ActorSystemLoader actorSystemLoader,
HighAvailabilityServices highAvailabilityServices) { HighAvailabilityServices highAvailabilityServices) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig, "flinkConfig"); this.flinkConfig = Preconditions.checkNotNull(flinkConfig, "flinkConfig");
this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader, "actorSystemLoader"); this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader, "actorSystemLoader");
Expand Down

0 comments on commit b946c5e

Please sign in to comment.