Skip to content

Commit

Permalink
[FLINK-9016] [flip6] Properly unregister jobs from JobMetricGroup
Browse files Browse the repository at this point in the history
This commit properly removes jobs from the JobMetricGroup once a job
has reached a terminal state.
  • Loading branch information
tillrohrmann committed Mar 18, 2018
1 parent 91d346e commit c4a1d09
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 91 deletions.
Expand Up @@ -49,7 +49,8 @@
import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceOverview; import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
Expand Down Expand Up @@ -97,7 +98,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private final JobManagerSharedServices jobManagerSharedServices; private final JobManagerSharedServices jobManagerSharedServices;
private final HeartbeatServices heartbeatServices; private final HeartbeatServices heartbeatServices;
private final BlobServer blobServer; private final BlobServer blobServer;
private final MetricRegistry metricRegistry;


private final FatalErrorHandler fatalErrorHandler; private final FatalErrorHandler fatalErrorHandler;


Expand All @@ -109,6 +109,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme


private final JobManagerRunnerFactory jobManagerRunnerFactory; private final JobManagerRunnerFactory jobManagerRunnerFactory;


private final JobManagerMetricGroup jobManagerMetricGroup;

@Nullable
private final String metricQueryServicePath;

@Nullable @Nullable
protected final String restAddress; protected final String restAddress;


Expand All @@ -123,7 +128,8 @@ public Dispatcher(
ResourceManagerGateway resourceManagerGateway, ResourceManagerGateway resourceManagerGateway,
BlobServer blobServer, BlobServer blobServer,
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry, JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricServiceQueryPath,
ArchivedExecutionGraphStore archivedExecutionGraphStore, ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory, JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler, FatalErrorHandler fatalErrorHandler,
Expand All @@ -135,9 +141,10 @@ public Dispatcher(
this.resourceManagerGateway = Preconditions.checkNotNull(resourceManagerGateway); this.resourceManagerGateway = Preconditions.checkNotNull(resourceManagerGateway);
this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices); this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
this.blobServer = Preconditions.checkNotNull(blobServer); this.blobServer = Preconditions.checkNotNull(blobServer);
this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
this.submittedJobGraphStore = Preconditions.checkNotNull(submittedJobGraphStore); this.submittedJobGraphStore = Preconditions.checkNotNull(submittedJobGraphStore);
this.jobManagerMetricGroup = Preconditions.checkNotNull(jobManagerMetricGroup);
this.metricQueryServicePath = metricServiceQueryPath;


this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration( this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(
configuration, configuration,
Expand Down Expand Up @@ -192,6 +199,8 @@ public CompletableFuture<Void> postStop() {
exception = ExceptionUtils.firstOrSuppressed(e, exception); exception = ExceptionUtils.firstOrSuppressed(e, exception);
} }


jobManagerMetricGroup.close();

if (exception != null) { if (exception != null) {
throw exception; throw exception;
} else { } else {
Expand Down Expand Up @@ -251,7 +260,8 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)
heartbeatServices, heartbeatServices,
blobServer, blobServer,
jobManagerSharedServices, jobManagerSharedServices,
metricRegistry, jobManagerMetricGroup.addJob(jobGraph),
metricQueryServicePath,
restAddress); restAddress);


jobManagerRunner.getResultFuture().whenCompleteAsync( jobManagerRunner.getResultFuture().whenCompleteAsync(
Expand Down Expand Up @@ -464,8 +474,6 @@ public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout)


@Override @Override
public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) { public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
final String metricQueryServicePath = metricRegistry.getMetricQueryServicePath();

if (metricQueryServicePath != null) { if (metricQueryServicePath != null) {
return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath)); return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
} else { } else {
Expand Down Expand Up @@ -513,6 +521,8 @@ private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
registerOrphanedJobManagerTerminationFuture(jobManagerRunnerTerminationFuture); registerOrphanedJobManagerTerminationFuture(jobManagerRunnerTerminationFuture);
} }


jobManagerMetricGroup.removeJob(jobId);

if (cleanupHA) { if (cleanupHA) {
submittedJobGraphStore.removeJobGraph(jobId); submittedJobGraphStore.removeJobGraph(jobId);
} }
Expand Down Expand Up @@ -725,7 +735,8 @@ JobManagerRunner createJobManagerRunner(
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
BlobServer blobServer, BlobServer blobServer,
JobManagerSharedServices jobManagerServices, JobManagerSharedServices jobManagerServices,
MetricRegistry metricRegistry, JobManagerJobMetricGroup jobManagerJobMetricGroup,
@Nullable String metricQueryServicePath,
@Nullable String restAddress) throws Exception; @Nullable String restAddress) throws Exception;
} }


Expand All @@ -745,7 +756,8 @@ public JobManagerRunner createJobManagerRunner(
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
BlobServer blobServer, BlobServer blobServer,
JobManagerSharedServices jobManagerServices, JobManagerSharedServices jobManagerServices,
MetricRegistry metricRegistry, JobManagerJobMetricGroup jobManagerJobMetricGroup,
@Nullable String metricQueryServicePath,
@Nullable String restAddress) throws Exception { @Nullable String restAddress) throws Exception {
return new JobManagerRunner( return new JobManagerRunner(
resourceId, resourceId,
Expand All @@ -756,7 +768,8 @@ public JobManagerRunner createJobManagerRunner(
heartbeatServices, heartbeatServices,
blobServer, blobServer,
jobManagerServices, jobManagerServices,
metricRegistry, jobManagerJobMetricGroup,
metricQueryServicePath,
restAddress); restAddress);
} }
} }
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -60,7 +60,8 @@ public MiniDispatcher(
ResourceManagerGateway resourceManagerGateway, ResourceManagerGateway resourceManagerGateway,
BlobServer blobServer, BlobServer blobServer,
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry, JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore, ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory, JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler, FatalErrorHandler fatalErrorHandler,
Expand All @@ -76,7 +77,8 @@ public MiniDispatcher(
resourceManagerGateway, resourceManagerGateway,
blobServer, blobServer,
heartbeatServices, heartbeatServices,
metricRegistry, jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore, archivedExecutionGraphStore,
jobManagerRunnerFactory, jobManagerRunnerFactory,
fatalErrorHandler, fatalErrorHandler,
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcService;
Expand All @@ -45,7 +45,8 @@ public StandaloneDispatcher(
ResourceManagerGateway resourceManagerGateway, ResourceManagerGateway resourceManagerGateway,
BlobServer blobServer, BlobServer blobServer,
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry, JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore, ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory, JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler, FatalErrorHandler fatalErrorHandler,
Expand All @@ -59,7 +60,8 @@ public StandaloneDispatcher(
resourceManagerGateway, resourceManagerGateway,
blobServer, blobServer,
heartbeatServices, heartbeatServices,
metricRegistry, jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore, archivedExecutionGraphStore,
jobManagerRunnerFactory, jobManagerRunnerFactory,
fatalErrorHandler, fatalErrorHandler,
Expand Down
Expand Up @@ -51,6 +51,8 @@
import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
Expand Down Expand Up @@ -154,6 +156,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
@GuardedBy("lock") @GuardedBy("lock")
private ClusterInformation clusterInformation; private ClusterInformation clusterInformation;


@GuardedBy("lock")
private JobManagerMetricGroup jobManagerMetricGroup;

protected ClusterEntrypoint(Configuration configuration) { protected ClusterEntrypoint(Configuration configuration) {
this.configuration = Preconditions.checkNotNull(configuration); this.configuration = Preconditions.checkNotNull(configuration);
this.terminationFuture = new CompletableFuture<>(); this.terminationFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -327,14 +332,17 @@ protected void startClusterComponents(
clusterInformation, clusterInformation,
webMonitorEndpoint.getRestAddress()); webMonitorEndpoint.getRestAddress());


jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());

dispatcher = createDispatcher( dispatcher = createDispatcher(
configuration, configuration,
rpcService, rpcService,
highAvailabilityServices, highAvailabilityServices,
resourceManager.getSelfGateway(ResourceManagerGateway.class), resourceManager.getSelfGateway(ResourceManagerGateway.class),
blobServer, blobServer,
heartbeatServices, heartbeatServices,
metricRegistry, jobManagerMetricGroup,
metricRegistry.getMetricQueryServicePath(),
archivedExecutionGraphStore, archivedExecutionGraphStore,
this, this,
webMonitorEndpoint.getRestAddress()); webMonitorEndpoint.getRestAddress());
Expand Down Expand Up @@ -488,7 +496,19 @@ protected CompletableFuture<Void> stopClusterComponents() {
terminationFutures.add(FutureUtils.completedExceptionally(exception)); terminationFutures.add(FutureUtils.completedExceptionally(exception));
} }


return FutureUtils.completeAll(terminationFutures); final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);

if (jobManagerMetricGroup != null) {
return FutureUtils.runAfterwards(
componentTerminationFuture,
() -> {
synchronized (lock) {
jobManagerMetricGroup.close();
}
});
} else {
return componentTerminationFuture;
}
} }
} }


Expand Down Expand Up @@ -567,7 +587,8 @@ protected abstract Dispatcher createDispatcher(
ResourceManagerGateway resourceManagerGateway, ResourceManagerGateway resourceManagerGateway,
BlobServer blobServer, BlobServer blobServer,
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry, JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore, ArchivedExecutionGraphStore archivedExecutionGraphStore,
FatalErrorHandler fatalErrorHandler, FatalErrorHandler fatalErrorHandler,
@Nullable String restAddress) throws Exception; @Nullable String restAddress) throws Exception;
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint; import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
Expand Down Expand Up @@ -95,7 +95,8 @@ protected Dispatcher createDispatcher(
ResourceManagerGateway resourceManagerGateway, ResourceManagerGateway resourceManagerGateway,
BlobServer blobServer, BlobServer blobServer,
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry, JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore, ArchivedExecutionGraphStore archivedExecutionGraphStore,
FatalErrorHandler fatalErrorHandler, FatalErrorHandler fatalErrorHandler,
@Nullable String restAddress) throws Exception { @Nullable String restAddress) throws Exception {
Expand All @@ -114,7 +115,8 @@ protected Dispatcher createDispatcher(
resourceManagerGateway, resourceManagerGateway,
blobServer, blobServer,
heartbeatServices, heartbeatServices,
metricRegistry, jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore, archivedExecutionGraphStore,
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
fatalErrorHandler, fatalErrorHandler,
Expand Down
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
Expand Down Expand Up @@ -104,16 +104,17 @@ protected DispatcherRestEndpoint createRestEndpoint(


@Override @Override
protected Dispatcher createDispatcher( protected Dispatcher createDispatcher(
Configuration configuration, Configuration configuration,
RpcService rpcService, RpcService rpcService,
HighAvailabilityServices highAvailabilityServices, HighAvailabilityServices highAvailabilityServices,
ResourceManagerGateway resourceManagerGateway, ResourceManagerGateway resourceManagerGateway,
BlobServer blobServer, BlobServer blobServer,
HeartbeatServices heartbeatServices, HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry, JobManagerMetricGroup jobManagerMetricGroup,
ArchivedExecutionGraphStore archivedExecutionGraphStore, @Nullable String metricQueryServicePath,
FatalErrorHandler fatalErrorHandler, ArchivedExecutionGraphStore archivedExecutionGraphStore,
@Nullable String restAddress) throws Exception { FatalErrorHandler fatalErrorHandler,
@Nullable String restAddress) throws Exception {


// create the default dispatcher // create the default dispatcher
return new StandaloneDispatcher( return new StandaloneDispatcher(
Expand All @@ -124,7 +125,8 @@ protected Dispatcher createDispatcher(
resourceManagerGateway, resourceManagerGateway,
blobServer, blobServer,
heartbeatServices, heartbeatServices,
metricRegistry, jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore, archivedExecutionGraphStore,
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
fatalErrorHandler, fatalErrorHandler,
Expand Down

0 comments on commit c4a1d09

Please sign in to comment.