Skip to content

Commit

Permalink
[FLINK-10247] Return TaskManager metricy query paths in non-blocking …
Browse files Browse the repository at this point in the history
…fashion
  • Loading branch information
tillrohrmann committed Oct 10, 2018
1 parent cbed19c commit d7550d6
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 18 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;


/** /**
* Serializable {@link Optional}. * Serializable {@link Optional}.
Expand Down Expand Up @@ -58,10 +59,26 @@ public void ifPresent(Consumer<? super T> consumer) {
} }
} }


public <R> Optional<R> map(Function<? super T, ? extends R> mapper) {
if (value == null) {
return Optional.empty();
} else {
return Optional.ofNullable(mapper.apply(value));
}
}

public static <T extends Serializable> SerializableOptional<T> of(@Nonnull T value) { public static <T extends Serializable> SerializableOptional<T> of(@Nonnull T value) {
return new SerializableOptional<>(value); return new SerializableOptional<>(value);
} }


public static <T extends Serializable> SerializableOptional<T> ofNullable(@Nullable T value) {
if (value == null) {
return empty();
} else {
return of(value);
}
}

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T extends Serializable> SerializableOptional<T> empty() { public static <T extends Serializable> SerializableOptional<T> empty() {
return (SerializableOptional<T>) EMPTY; return (SerializableOptional<T>) EMPTY;
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage; import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
Expand Down Expand Up @@ -75,11 +74,13 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;


import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;


Expand Down Expand Up @@ -581,25 +582,26 @@ public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout)


@Override @Override
public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) { public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
final ArrayList<Tuple2<ResourceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size()); final ArrayList<CompletableFuture<Optional<Tuple2<ResourceID, String>>>> metricQueryServicePathFutures = new ArrayList<>(taskExecutors.size());


for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) { for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) {
final ResourceID tmResourceId = workerRegistrationEntry.getKey(); final ResourceID tmResourceId = workerRegistrationEntry.getKey();
final WorkerRegistration<WorkerType> workerRegistration = workerRegistrationEntry.getValue(); final WorkerRegistration<WorkerType> workerRegistration = workerRegistrationEntry.getValue();
final TaskExecutorGateway taskExecutorGateway = workerRegistration.getTaskExecutorGateway(); final TaskExecutorGateway taskExecutorGateway = workerRegistration.getTaskExecutorGateway();
String tmMetricQueryServicePath = null;
try {
tmMetricQueryServicePath =
taskExecutorGateway.getMetricQueryServiceAddress(AkkaUtils.getDefaultTimeout()).get();
} catch (Exception e) {
log.info("Fail to get task manager's metric query service path for worker: {}", taskExecutorGateway.getAddress());
continue;
}


metricQueryServicePaths.add(Tuple2.of(tmResourceId, tmMetricQueryServicePath)); final CompletableFuture<Optional<Tuple2<ResourceID, String>>> metricQueryServicePathFuture = taskExecutorGateway
.requestMetricQueryServiceAddress(timeout)
.thenApply(optional -> optional.map(path -> Tuple2.of(tmResourceId, path)));

metricQueryServicePathFutures.add(metricQueryServicePathFuture);
} }


return CompletableFuture.completedFuture(metricQueryServicePaths); return FutureUtils.combineAll(metricQueryServicePathFutures).thenApply(
collection -> collection
.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList()));
} }


@Override @Override
Expand Down
Expand Up @@ -102,6 +102,7 @@
import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkException;


Expand Down Expand Up @@ -158,6 +159,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
private final BlobCacheService blobCacheService; private final BlobCacheService blobCacheService;


/** The path to metric query service on this Task Manager. */ /** The path to metric query service on this Task Manager. */
@Nullable
private final String metricQueryServicePath; private final String metricQueryServicePath;


// --------- TaskManager services -------- // --------- TaskManager services --------
Expand Down Expand Up @@ -214,7 +216,7 @@ public TaskExecutor(
TaskManagerServices taskExecutorServices, TaskManagerServices taskExecutorServices,
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
TaskManagerMetricGroup taskManagerMetricGroup, TaskManagerMetricGroup taskManagerMetricGroup,
String metricQueryServicePath, @Nullable String metricQueryServicePath,
BlobCacheService blobCacheService, BlobCacheService blobCacheService,
FatalErrorHandler fatalErrorHandler) { FatalErrorHandler fatalErrorHandler) {


Expand Down Expand Up @@ -853,8 +855,8 @@ public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType,
} }


@Override @Override
public CompletableFuture<String> getMetricQueryServiceAddress(Time timeout) { public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time timeout) {
return CompletableFuture.completedFuture(metricQueryServicePath); return CompletableFuture.completedFuture(SerializableOptional.ofNullable(metricQueryServicePath));
} }


// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.types.SerializableOptional;


import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


Expand Down Expand Up @@ -201,5 +202,5 @@ CompletableFuture<Acknowledge> freeSlot(
* *
* @return Future String with Fully qualified (RPC) address of Metric Query Service on the TaskManager. * @return Future String with Fully qualified (RPC) address of Metric Query Service on the TaskManager.
*/ */
CompletableFuture<String> getMetricQueryServiceAddress(@RpcTimeout Time timeout); CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(@RpcTimeout Time timeout);
} }
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;


import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -153,8 +154,8 @@ public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType,
} }


@Override @Override
public CompletableFuture<String> getMetricQueryServiceAddress(Time timeout) { public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time timeout) {
return CompletableFuture.completedFuture(address); return CompletableFuture.completedFuture(SerializableOptional.of(address));
} }


@Override @Override
Expand Down

0 comments on commit d7550d6

Please sign in to comment.