Skip to content

Commit

Permalink
[FLINK-11603][metrics] Port MetricQueryService to RpcEndpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored and zentol committed Apr 4, 2019
1 parent 306a8eb commit 6e8a3f3
Show file tree
Hide file tree
Showing 48 changed files with 470 additions and 625 deletions.
Expand Up @@ -553,7 +553,7 @@ public static Configuration cloneConfiguration(Configuration configuration) {
/**
* Configuration interface for {@link ActorSystem} underlying executor.
*/
interface ActorSystemExecutorConfiguration {
public interface ActorSystemExecutorConfiguration {

/**
* Create the executor {@link Config} for the respective executor.
Expand Down
Expand Up @@ -126,7 +126,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private final HistoryServerArchivist historyServerArchivist;

@Nullable
private final String metricQueryServicePath;
private final String metricServiceQueryAddress;

private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;

Expand All @@ -142,7 +142,7 @@ public Dispatcher(
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricServiceQueryPath,
@Nullable String metricServiceQueryAddress,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler,
Expand All @@ -157,7 +157,7 @@ public Dispatcher(
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
this.submittedJobGraphStore = Preconditions.checkNotNull(submittedJobGraphStore);
this.jobManagerMetricGroup = Preconditions.checkNotNull(jobManagerMetricGroup);
this.metricQueryServicePath = metricServiceQueryPath;
this.metricServiceQueryAddress = metricServiceQueryAddress;

this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(
configuration,
Expand Down Expand Up @@ -550,17 +550,17 @@ public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout)
}

@Override
public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
if (metricQueryServicePath != null) {
return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
public CompletableFuture<Collection<String>> requestMetricQueryServiceAddresses(Time timeout) {
if (metricServiceQueryAddress != null) {
return CompletableFuture.completedFuture(Collections.singleton(metricServiceQueryAddress));
} else {
return CompletableFuture.completedFuture(Collections.emptyList());
}
}

@Override
public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
return runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout));
public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServiceAddresses(Time timeout) {
return runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestTaskManagerMetricQueryServiceAddresses(timeout));
}

@Override
Expand Down
Expand Up @@ -48,7 +48,7 @@ T createDispatcher(
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
@Nullable String metricQueryServiceAddress,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist) throws Exception;
Expand Down
Expand Up @@ -55,7 +55,7 @@ public MiniDispatcher createDispatcher(
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
@Nullable String metricQueryServiceAddress,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist) throws Exception {
Expand All @@ -74,7 +74,7 @@ public MiniDispatcher createDispatcher(
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricQueryServicePath,
metricQueryServiceAddress,
archivedExecutionGraphStore,
DefaultJobManagerRunnerFactory.INSTANCE,
fatalErrorHandler,
Expand Down
Expand Up @@ -67,7 +67,7 @@ public MiniDispatcher(
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
@Nullable String metricQueryServiceAddress,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler,
Expand All @@ -84,7 +84,7 @@ public MiniDispatcher(
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricQueryServicePath,
metricQueryServiceAddress,
archivedExecutionGraphStore,
jobManagerRunnerFactory,
fatalErrorHandler,
Expand Down
Expand Up @@ -45,7 +45,7 @@ public Dispatcher createDispatcher(
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
@Nullable String metricQueryServiceAddress,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist) throws Exception {
Expand All @@ -59,7 +59,7 @@ public Dispatcher createDispatcher(
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricQueryServicePath,
metricQueryServiceAddress,
archivedExecutionGraphStore,
DefaultJobManagerRunnerFactory.INSTANCE,
fatalErrorHandler,
Expand Down
Expand Up @@ -47,7 +47,7 @@ public StandaloneDispatcher(
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
@Nullable String metricQueryServiceAddress,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler,
Expand All @@ -62,7 +62,7 @@ public StandaloneDispatcher(
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricQueryServicePath,
metricQueryServiceAddress,
archivedExecutionGraphStore,
jobManagerRunnerFactory,
fatalErrorHandler,
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
Expand All @@ -56,15 +55,14 @@
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;

import akka.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -136,10 +134,6 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
@GuardedBy("lock")
private ExecutorService ioExecutor;

@GuardedBy("lock")
private ActorSystem metricQueryServiceActorSystem;

@GuardedBy("lock")
private ArchivedExecutionGraphStore archivedExecutionGraphStore;

private final Thread shutDownHook;
Expand Down Expand Up @@ -224,9 +218,7 @@ private void runCluster(Configuration configuration) throws Exception {
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);

clusterComponent.getShutDownFuture().whenComplete(
Expand Down Expand Up @@ -271,10 +263,8 @@ protected void initializeServices(Configuration configuration) throws Exception
heartbeatServices = createHeartbeatServices(configuration);
metricRegistry = createMetricRegistry(configuration);

// TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint
// Start actor system for metric query service on any available port
metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, bindAddress, LOG);
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
final RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration, bindAddress);
metricRegistry.startQueryService(metricQueryServiceRpcService, null);

archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
}
Expand Down Expand Up @@ -364,10 +354,6 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
terminationFutures.add(metricRegistry.shutdown());
}

if (metricQueryServiceActorSystem != null) {
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
}

if (ioExecutor != null) {
terminationFutures.add(ExecutorUtils.nonBlockingShutdown(shutdownTimeout, TimeUnit.MILLISECONDS, ioExecutor));
}
Expand Down
Expand Up @@ -189,7 +189,7 @@ public DispatcherResourceManagerComponent<T> create(
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricRegistry.getMetricQueryServicePath(),
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
archivedExecutionGraphStore,
fatalErrorHandler,
historyServerArchivist);
Expand Down
Expand Up @@ -76,10 +76,12 @@ public interface MetricRegistry {
ScopeFormats getScopeFormats();

/**
* Returns the path of the {@link MetricQueryService} or null, if none is started.
* Returns the gateway of the {@link MetricQueryService} or null, if none is started.
*
* @return Path of the MetricQueryService or null, if none is started
* @return Gateway of the MetricQueryService or null, if none is started
*/
@Nullable
String getMetricQueryServicePath();
default String getMetricQueryServiceGatewayRpcAddress() {
return null;
}
}
Expand Up @@ -29,22 +29,20 @@
import org.apache.flink.metrics.View;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,10 +78,10 @@ public class MetricRegistryImpl implements MetricRegistry {
private final long maximumFramesize;

@Nullable
private ActorRef queryService;
private MetricQueryService queryService;

@Nullable
private String metricQueryServicePath;
private RpcService metricQueryServiceRpcService;

private ViewUpdater viewUpdater;

Expand All @@ -108,7 +106,7 @@ public MetricRegistryImpl(MetricRegistryConfiguration config) {
this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));

this.queryService = null;
this.metricQueryServicePath = null;
this.metricQueryServiceRpcService = null;

if (reporterConfigurations.isEmpty()) {
// no reporters defined
Expand Down Expand Up @@ -179,31 +177,56 @@ public MetricRegistryImpl(MetricRegistryConfiguration config) {
/**
* Initializes the MetricQueryService.
*
* @param actorSystem ActorSystem to create the MetricQueryService on
* @param rpcService RpcService to create the MetricQueryService on
* @param resourceID resource ID used to disambiguate the actor name
*/
public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) {
public void startQueryService(RpcService rpcService, ResourceID resourceID) {
synchronized (lock) {
Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");

try {
queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID, maximumFramesize);
metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService);
metricQueryServiceRpcService = rpcService;
queryService = MetricQueryService.createMetricQueryService(rpcService, resourceID, maximumFramesize);
queryService.start();
} catch (Exception e) {
LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
}
}
}

/**
* Returns the rpc service that the {@link MetricQueryService} runs in.
*
* @return rpc service of hte MetricQueryService
*/
@Nullable
public RpcService getMetricQueryServiceRpcService() {
return metricQueryServiceRpcService;
}

/**
* Returns the address under which the {@link MetricQueryService} is reachable.
*
* @return address of the metric query service
*/
@Override
@Nullable
public String getMetricQueryServicePath() {
return metricQueryServicePath;
public String getMetricQueryServiceGatewayRpcAddress() {
if (queryService != null) {
return queryService.getSelfGateway(MetricQueryServiceGateway.class).getAddress();
} else {
return null;
}
}

@VisibleForTesting
@Nullable
MetricQueryServiceGateway getMetricQueryServiceGateway() {
if (queryService != null) {
return queryService.getSelfGateway(MetricQueryServiceGateway.class);
} else {
return null;
}
}

@Override
Expand Down Expand Up @@ -260,13 +283,9 @@ public CompletableFuture<Void> shutdown() {
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
final Time gracePeriod = Time.seconds(1L);

if (queryService != null) {
final CompletableFuture<Void> queryServiceTerminationFuture = ActorUtils.nonBlockingShutDown(
gracePeriod.toMilliseconds(),
TimeUnit.MILLISECONDS,
queryService);

terminationFutures.add(queryServiceTerminationFuture);
if (metricQueryServiceRpcService != null) {
final CompletableFuture<Void> metricQueryServiceRpcServiceTerminationFuture = metricQueryServiceRpcService.stopService();
terminationFutures.add(metricQueryServiceRpcServiceTerminationFuture);
}

Throwable throwable = null;
Expand Down Expand Up @@ -338,7 +357,7 @@ public void register(Metric metric, String metricName, AbstractMetricGroup group
}
try {
if (queryService != null) {
MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
queryService.addMetric(metricName, metric, group);
}
} catch (Exception e) {
LOG.warn("Error while registering metric.", e);
Expand Down Expand Up @@ -378,7 +397,7 @@ public void unregister(Metric metric, String metricName, AbstractMetricGroup gro
}
try {
if (queryService != null) {
MetricQueryService.notifyOfRemovedMetric(queryService, metric);
queryService.removeMetric(metric);
}
} catch (Exception e) {
LOG.warn("Error while registering metric.", e);
Expand All @@ -400,7 +419,7 @@ public void unregister(Metric metric, String metricName, AbstractMetricGroup gro

@VisibleForTesting
@Nullable
public ActorRef getQueryService() {
MetricQueryService getQueryService() {
return queryService;
}

Expand Down

0 comments on commit 6e8a3f3

Please sign in to comment.