Skip to content

Commit

Permalink
[hotfix] Pass in Rest address to Dispatcher as nullable String
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Nov 30, 2017
1 parent dcbc966 commit f0e82dc
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
Expand Up @@ -56,13 +56,14 @@
import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;


import javax.annotation.Nullable;

import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


Expand Down Expand Up @@ -93,7 +94,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme


private final LeaderElectionService leaderElectionService; private final LeaderElectionService leaderElectionService;


private final CompletableFuture<String> restAddressFuture; @Nullable
protected final String restAddress;


protected Dispatcher( protected Dispatcher(
RpcService rpcService, RpcService rpcService,
Expand All @@ -105,7 +107,7 @@ protected Dispatcher(
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry, MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler, FatalErrorHandler fatalErrorHandler,
Optional<String> restAddress) throws Exception { @Nullable String restAddress) throws Exception {
super(rpcService, endpointId); super(rpcService, endpointId);


this.configuration = Preconditions.checkNotNull(configuration); this.configuration = Preconditions.checkNotNull(configuration);
Expand All @@ -124,10 +126,8 @@ protected Dispatcher(
jobManagerRunners = new HashMap<>(16); jobManagerRunners = new HashMap<>(16);


leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService(); leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
this.restAddressFuture = restAddress
.map(CompletableFuture::completedFuture)
.orElse(FutureUtils.completedExceptionally(new DispatcherException("The Dispatcher has not been started with a REST endpoint.")));


this.restAddress = restAddress;
} }


//------------------------------------------------------ //------------------------------------------------------
Expand Down Expand Up @@ -275,7 +275,11 @@ public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) {


@Override @Override
public CompletableFuture<String> requestRestAddress(Time timeout) { public CompletableFuture<String> requestRestAddress(Time timeout) {
return restAddressFuture; if (restAddress != null) {
return CompletableFuture.completedFuture(restAddress);
} else {
return FutureUtils.completedExceptionally(new DispatcherException("The Dispatcher has not been started with a REST endpoint."));
}
} }


@Override @Override
Expand Down
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcService;


import java.util.Optional; import javax.annotation.Nullable;


/** /**
* Dispatcher implementation which spawns a {@link JobMaster} for each * Dispatcher implementation which spawns a {@link JobMaster} for each
Expand All @@ -51,7 +51,7 @@ public StandaloneDispatcher(
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry, MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler, FatalErrorHandler fatalErrorHandler,
Optional<String> restAddress) throws Exception { @Nullable String restAddress) throws Exception {
super( super(
rpcService, rpcService,
endpointId, endpointId,
Expand Down
Expand Up @@ -49,7 +49,8 @@


import akka.actor.ActorSystem; import akka.actor.ActorSystem;


import java.util.Optional; import javax.annotation.Nullable;

import java.util.concurrent.Executor; import java.util.concurrent.Executor;


/** /**
Expand Down Expand Up @@ -130,7 +131,7 @@ protected void startClusterComponents(
heartbeatServices, heartbeatServices,
metricRegistry, metricRegistry,
this, this,
Optional.of(dispatcherRestEndpoint.getRestAddress())); dispatcherRestEndpoint.getRestAddress());


LOG.debug("Starting ResourceManager."); LOG.debug("Starting ResourceManager.");
resourceManager.start(); resourceManager.start();
Expand Down Expand Up @@ -214,7 +215,7 @@ protected Dispatcher createDispatcher(
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry, MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler, FatalErrorHandler fatalErrorHandler,
Optional<String> restAddress) throws Exception { @Nullable String restAddress) throws Exception {


// create the default dispatcher // create the default dispatcher
return new StandaloneDispatcher( return new StandaloneDispatcher(
Expand Down
Expand Up @@ -51,7 +51,6 @@
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;


import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -224,7 +223,7 @@ protected TestingDispatcher(
heartbeatServices, heartbeatServices,
metricRegistry, metricRegistry,
fatalErrorHandler, fatalErrorHandler,
Optional.empty()); null);


this.jobManagerRunner = jobManagerRunner; this.jobManagerRunner = jobManagerRunner;
this.expectedJobId = expectedJobId; this.expectedJobId = expectedJobId;
Expand Down

0 comments on commit f0e82dc

Please sign in to comment.