From 6cfbd92907d004bced07b23e17e90aefcc94b481 Mon Sep 17 00:00:00 2001 From: Zhenqiu Huang Date: Mon, 29 Apr 2024 14:56:34 -0700 Subject: [PATCH] [FLINK-33212][core] add job status changed listener for lineage --- .../advanced/job_status_listener.md | 81 +++++++++++++ .../generated/deployment_configuration.html | 6 + .../executors/EmbeddedExecutor.java | 29 ++++- .../executors/EmbeddedExecutorFactory.java | 1 + .../WebSubmissionExecutorFactory.java | 1 + .../AbstractSessionClusterExecutor.java | 30 ++++- .../deployment/executors/LocalExecutor.java | 23 +++- .../executors/PipelineExecutorUtils.java | 41 +++++++ .../deployment/executors/RemoteExecutor.java | 5 +- .../executors/RemoteExecutorFactory.java | 2 +- .../configuration/DeploymentOptions.java | 8 ++ .../DefaultJobExecutionStatusEvent.java | 74 ++++++++++++ .../execution/JobExecutionStatusEvent.java | 38 ++++++ .../core/execution/JobStatusChangedEvent.java | 31 +++++ .../execution/JobStatusChangedListener.java | 32 +++++ .../JobStatusChangedListenerFactory.java | 54 +++++++++ .../JobStatusChangedListenerUtils.java | 79 ++++++++++++ .../KubernetesSessionClusterExecutor.java | 5 +- ...bernetesSessionClusterExecutorFactory.java | 2 +- .../executiongraph/DefaultExecutionGraph.java | 22 +++- .../DefaultExecutionGraphBuilder.java | 9 +- .../streaming/api/graph/StreamGraph.java | 6 + .../execution/DefaultJobCreatedEvent.java | 64 ++++++++++ .../runtime/execution/JobCreatedEvent.java | 35 ++++++ .../JobStatusChangedListenerITCase.java | 113 ++++++++++++++++++ .../executors/YarnSessionClusterExecutor.java | 5 +- .../YarnSessionClusterExecutorFactory.java | 2 +- 27 files changed, 781 insertions(+), 17 deletions(-) create mode 100644 docs/content/docs/deployment/advanced/job_status_listener.md create mode 100644 flink-core/src/main/java/org/apache/flink/core/execution/DefaultJobExecutionStatusEvent.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/execution/JobExecutionStatusEvent.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedEvent.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListener.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerFactory.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java create mode 100644 flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java diff --git a/docs/content/docs/deployment/advanced/job_status_listener.md b/docs/content/docs/deployment/advanced/job_status_listener.md new file mode 100644 index 00000000000000..723cc862594689 --- /dev/null +++ b/docs/content/docs/deployment/advanced/job_status_listener.md @@ -0,0 +1,81 @@ + +--- +title: "Job Status Changed Listener" +nav-title: job-status-listener +nav-parent_id: advanced +nav-pos: 3 +--- + + +## Job status changed listener +Flink provides a pluggable interface for users to register their custom logic for handling with the job status changes in which lineage info about source/sink is provided. +This enables users to implement their own flink lineage reporter to send lineage info to third party data lineage systems for example Datahub and Openlineage. + +The job status changed listeners are triggered every time status change happened for the application. The data lineage info is included in the JobCreatedEvent. + +### Implement a plugin for your custom enricher + +To implement a custom JobStatusChangedListener plugin, you need to: + +- Add your own JobStatusChangedListener by implementing the {{< gh_link file="/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListener.java" name="JobStatusChangedListener" >}} interface. + +- Add your own JobStatusChangedListenerFactory by implementing the {{< gh_link file="/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerFactory.java" name="JobStatusChangedListenerFactory" >}} interface. + +- Add a service entry. Create a file `META-INF/services/org.apache.flink.core.execution.JobStatusChangedListenerFactory` which contains the class name of your job status changed listener factory class (see [Java Service Loader](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/ServiceLoader.html) docs for more details). + + +Then, create a jar which includes your `JobStatusChangedListener`, `JobStatusChangedListenerFactory`, `META-INF/services/` and all external dependencies. +Make a directory in `plugins/` of your Flink distribution with an arbitrary name, e.g. "job-status-changed-listener", and put the jar into this directory. +See [Flink Plugin]({{< ref "docs/deployment/filesystems/plugins" >}}) for more details. + +JobStatusChangedListenerFactory example: + +``` java +package org.apache.flink.test.execution; + +public static class TestingJobStatusChangedListenerFactory + implements JobStatusChangedListenerFactory { + + @Override + public JobStatusChangedListener createListener(Context context) { + return new TestingJobStatusChangedListener(); + } +} +``` + +JobStatusChangedListener example: + +``` java +package org.apache.flink.test.execution; + +private static class TestingJobStatusChangedListener implements JobStatusChangedListener { + + @Override + public void onEvent(JobStatusChangedEvent event) { + statusChangedEvents.add(event); + } +} +``` + +### Configuration + +Flink components loads JobStatusChangedListener plugins at startup. To make sure your JobStatusChangedListeners are loaded all class names should be defined as part of [execution.job-status-changed-listeners]({{< ref "docs/deployment/config#execution.job-status-changed-listeners" >}}). + If this configuration is empty, NO enrichers will be started. Example: +``` + execution.job-status-changed-listeners = org.apache.flink.test.execution.TestingJobStatusChangedListenerFactory +``` diff --git a/docs/layouts/shortcodes/generated/deployment_configuration.html b/docs/layouts/shortcodes/generated/deployment_configuration.html index 07daa602a07543..f3e08ecd6964fa 100644 --- a/docs/layouts/shortcodes/generated/deployment_configuration.html +++ b/docs/layouts/shortcodes/generated/deployment_configuration.html @@ -20,6 +20,12 @@ List<String> Custom JobListeners to be registered with the execution environment. The registered listeners cannot have constructors with arguments. + +
execution.job-status-changed-listeners
+ (none) + List<String> + When job is created or its status is changed, Flink will generate job event and notify job status changed listener. +
execution.program-config.enabled
true diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java index 7094c1cac130d8..09b6443e2f4cb0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java @@ -27,12 +27,15 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobStatusChangedListener; +import org.apache.flink.core.execution.JobStatusChangedListenerUtils; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.client.ClientUtils; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.function.FunctionUtils; import org.slf4j.Logger; @@ -41,9 +44,12 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -57,6 +63,10 @@ public class EmbeddedExecutor implements PipelineExecutor { private static final Logger LOG = LoggerFactory.getLogger(EmbeddedExecutor.class); + private final ExecutorService executorService = + Executors.newFixedThreadPool( + 1, new ExecutorThreadFactory("Flink-EmbeddedClusterExecutor-IO")); + public static final String NAME = "embedded"; private final Collection submittedJobIds; @@ -65,6 +75,8 @@ public class EmbeddedExecutor implements PipelineExecutor { private final EmbeddedJobClientCreator jobClientCreator; + private final List jobStatusChangedListeners; + /** * Creates a {@link EmbeddedExecutor}. * @@ -73,14 +85,22 @@ public class EmbeddedExecutor implements PipelineExecutor { * caller. * @param dispatcherGateway the dispatcher of the cluster which is going to be used to submit * jobs. + * @param configuration the flink application configuration + * @param jobClientCreator the job client creator */ public EmbeddedExecutor( final Collection submittedJobIds, final DispatcherGateway dispatcherGateway, + final Configuration configuration, final EmbeddedJobClientCreator jobClientCreator) { this.submittedJobIds = checkNotNull(submittedJobIds); this.dispatcherGateway = checkNotNull(dispatcherGateway); this.jobClientCreator = checkNotNull(jobClientCreator); + this.jobStatusChangedListeners = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + Thread.currentThread().getContextClassLoader(), + configuration, + executorService); } @Override @@ -153,7 +173,14 @@ private CompletableFuture submitAndGetJobClientFuture( return jobId; })) .thenApplyAsync( - jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)); + jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) + .whenCompleteAsync( + (jobClient, throwable) -> { + if (throwable == null) { + PipelineExecutorUtils.notifyJobStatusListeners( + pipeline, jobGraph, jobStatusChangedListeners); + } + }); } private static CompletableFuture submitJob( diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java index 0810c1e1c06fc5..790fffb0a1fdc2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java @@ -79,6 +79,7 @@ public PipelineExecutor getExecutor(final Configuration configuration) { return new EmbeddedExecutor( submittedJobIds, dispatcherGateway, + configuration, (jobId, userCodeClassloader) -> { final Time timeout = Time.milliseconds( diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/WebSubmissionExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/WebSubmissionExecutorFactory.java index f087b5f30db0b0..16e3bf0581358e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/WebSubmissionExecutorFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/WebSubmissionExecutorFactory.java @@ -75,6 +75,7 @@ public PipelineExecutor getExecutor(final Configuration configuration) { return new EmbeddedExecutor( submittedJobIds, dispatcherGateway, + configuration, (jobId, userCodeClassloader) -> new WebSubmissionJobClient(jobId)); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java index 9397e506ccf7cf..8da7262bb1fb1f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java @@ -29,16 +29,22 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.CacheSupportedPipelineExecutor; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobStatusChangedListener; +import org.apache.flink.core.execution.JobStatusChangedListenerUtils; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.function.FunctionUtils; import javax.annotation.Nonnull; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -56,10 +62,23 @@ public class AbstractSessionClusterExecutor< ClusterID, ClientFactory extends ClusterClientFactory> implements CacheSupportedPipelineExecutor { + private final ExecutorService executorService = + Executors.newFixedThreadPool( + 1, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); + private final ClientFactory clusterClientFactory; + private final Configuration configuration; + private final List jobStatusChangedListeners; - public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { + public AbstractSessionClusterExecutor( + @Nonnull final ClientFactory clusterClientFactory, Configuration configuration) { this.clusterClientFactory = checkNotNull(clusterClientFactory); + this.configuration = configuration; + this.jobStatusChangedListeners = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + Thread.currentThread().getContextClassLoader(), + configuration, + executorService); } @Override @@ -97,7 +116,14 @@ public CompletableFuture execute( clusterClientProvider, jobID, userCodeClassloader)) - .whenCompleteAsync((ignored1, ignored2) -> clusterClient.close()); + .whenCompleteAsync( + (jobClient, throwable) -> { + if (throwable == null) { + PipelineExecutorUtils.notifyJobStatusListeners( + pipeline, jobGraph, jobStatusChangedListeners); + } + clusterClient.close(); + }); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java index b8b60fe9adff1d..5361d332fc8279 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java @@ -26,13 +26,19 @@ import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobStatusChangedListener; +import org.apache.flink.core.execution.JobStatusChangedListenerUtils; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import java.net.MalformedURLException; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -41,11 +47,14 @@ /** An {@link PipelineExecutor} for executing a {@link Pipeline} locally. */ @Internal public class LocalExecutor implements PipelineExecutor { + private final ExecutorService executorService = + Executors.newFixedThreadPool(1, new ExecutorThreadFactory("Flink-LocalExecutor-IO")); public static final String NAME = "local"; private final Configuration configuration; private final Function miniClusterFactory; + private final List jobStatusChangedListeners; public static LocalExecutor create(Configuration configuration) { return new LocalExecutor(configuration, MiniCluster::new); @@ -62,6 +71,11 @@ private LocalExecutor( Function miniClusterFactory) { this.configuration = configuration; this.miniClusterFactory = miniClusterFactory; + this.jobStatusChangedListeners = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + Thread.currentThread().getContextClassLoader(), + configuration, + executorService); } @Override @@ -81,7 +95,14 @@ public CompletableFuture execute( final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader); return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory) - .submitJob(jobGraph, userCodeClassloader); + .submitJob(jobGraph, userCodeClassloader) + .whenComplete( + (ignored, throwable) -> { + if (throwable == null) { + PipelineExecutorUtils.notifyJobStatusListeners( + pipeline, jobGraph, jobStatusChangedListeners); + } + }); } private JobGraph getJobGraph( diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java index 1858315a0c26da..c96261dce87d1a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java @@ -19,23 +19,33 @@ package org.apache.flink.client.deployment.executors; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.FlinkPipelineTranslationUtil; import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.cli.ExecutionConfigAccessor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptionsInternal; +import org.apache.flink.core.execution.JobStatusChangedListener; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.net.MalformedURLException; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; /** Utility class with method related to job execution. */ public class PipelineExecutorUtils { + private static final Logger LOG = LoggerFactory.getLogger(PipelineExecutorUtils.class); /** * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}. @@ -80,4 +90,35 @@ public static JobGraph getJobGraph( return jobGraph; } + + /** + * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}. + * + * @param pipeline the pipeline that contains lineage graph information. + * @param jobGraph jobGraph that contains job basic info + * @param listeners the list of job status changed listeners + */ + public static void notifyJobStatusListeners( + @Nonnull final Pipeline pipeline, + @Nonnull final JobGraph jobGraph, + List listeners) { + RuntimeExecutionMode executionMode = + jobGraph.getJobConfiguration().get(ExecutionOptions.RUNTIME_MODE); + listeners.forEach( + listener -> { + try { + listener.onEvent( + new DefaultJobCreatedEvent( + jobGraph.getJobID(), + jobGraph.getName(), + ((StreamGraph) pipeline).getLineageGraph(), + executionMode)); + } catch (Throwable e) { + LOG.error( + "Fail to notify job status changed listener {}", + listener.getClass().getName(), + e); + } + }); + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java index 761d3cc91410e6..cbf66cd5edbbc4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.client.deployment.StandaloneClientFactory; import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.PipelineExecutor; /** The {@link PipelineExecutor} to be used when executing a job on an already running cluster. */ @@ -30,7 +31,7 @@ public class RemoteExecutor public static final String NAME = "remote"; - public RemoteExecutor() { - super(new StandaloneClientFactory()); + public RemoteExecutor(Configuration configuration) { + super(new StandaloneClientFactory(), configuration); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java index 5548156aa0cc41..bbcb7465e35824 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java @@ -40,6 +40,6 @@ public boolean isCompatibleWith(final Configuration configuration) { @Override public PipelineExecutor getExecutor(final Configuration configuration) { - return new RemoteExecutor(); + return new RemoteExecutor(configuration); } } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java index f6ca0d8a345fd8..aeb58a59bc5bab 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java @@ -78,6 +78,14 @@ public class DeploymentOptions { "Custom JobListeners to be registered with the execution environment." + " The registered listeners cannot have constructors with arguments."); + public static final ConfigOption> JOB_STATUS_CHANGED_LISTENERS = + key("execution.job-status-changed-listeners") + .stringType() + .asList() + .noDefaultValue() + .withDescription( + "When job is created or its status is changed, Flink will generate job event and notify job status changed listener."); + public static final ConfigOption SHUTDOWN_ON_APPLICATION_FINISH = ConfigOptions.key("execution.shutdown-on-application-finish") .booleanType() diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultJobExecutionStatusEvent.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultJobExecutionStatusEvent.java new file mode 100644 index 00000000000000..d8cb2da4568a4e --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultJobExecutionStatusEvent.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; + +import javax.annotation.Nullable; + +/** Default implementation for {@link JobExecutionStatusEvent}. */ +@Internal +public class DefaultJobExecutionStatusEvent implements JobExecutionStatusEvent { + private final JobID jobId; + private final String jobName; + private final JobStatus oldStatus; + private final JobStatus newStatus; + @Nullable private final Throwable cause; + + public DefaultJobExecutionStatusEvent( + JobID jobId, + String jobName, + JobStatus oldStatus, + JobStatus newStatus, + @Nullable Throwable cause) { + this.jobId = jobId; + this.jobName = jobName; + this.oldStatus = oldStatus; + this.newStatus = newStatus; + this.cause = cause; + } + + @Override + public JobStatus oldStatus() { + return oldStatus; + } + + @Override + public JobStatus newStatus() { + return newStatus; + } + + @Nullable + @Override + public Throwable exception() { + return cause; + } + + @Override + public JobID jobId() { + return jobId; + } + + @Override + public String jobName() { + return jobName; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobExecutionStatusEvent.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobExecutionStatusEvent.java new file mode 100644 index 00000000000000..82aeda5c6dfc34 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobExecutionStatusEvent.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.JobStatus; + +import javax.annotation.Nullable; + +/** Job execution status event. */ +@PublicEvolving +public interface JobExecutionStatusEvent extends JobStatusChangedEvent { + /** Old status for job. */ + JobStatus oldStatus(); + + /** New status for job. */ + JobStatus newStatus(); + + /** Exception for job. */ + @Nullable + Throwable exception(); +} diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedEvent.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedEvent.java new file mode 100644 index 00000000000000..21d0717ed2806c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.JobID; + +/** Basic job status event. */ +@PublicEvolving +public interface JobStatusChangedEvent { + + JobID jobId(); + + String jobName(); +} diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListener.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListener.java new file mode 100644 index 00000000000000..3610cd36055902 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * When job is created or its status is changed, Flink will generate job event and notify job status + * changed listener. + */ +@PublicEvolving +public interface JobStatusChangedListener { + + /* Event will be fired when job status is changed. */ + void onEvent(JobStatusChangedEvent event); +} diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerFactory.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerFactory.java new file mode 100644 index 00000000000000..03c1d49a0d6497 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; + +import java.util.concurrent.Executor; + +/** Factory for job status changed listener. */ +@PublicEvolving +public interface JobStatusChangedListenerFactory { + + JobStatusChangedListener createListener(Context context); + + @PublicEvolving + interface Context { + /* + * Configuration for the factory to create listener, users can add customized options to flink and get them here to create the listener. For + * example, users can add rest address for datahub to the configuration, and get it when they need to create http client for the listener. + */ + Configuration getConfiguration(); + + /** + * User classloader for the flink application. + * + * @return + */ + ClassLoader getUserClassLoader(); + + /* + * Get an Executor pool for the listener to run async operations that can potentially be IO-heavy. `JobMaster` will provide an independent executor + * for io operations and it won't block the main-thread. All tasks submitted to the executor will be executed in parallel, and when the job ends, + * previously submitted tasks will be executed, but no new tasks will be accepted. + */ + Executor getIOExecutor(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java new file mode 100644 index 00000000000000..a73641d7ffd6db --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS; + +/** Util class for {@link JobStatusChangedListener}. */ +@Internal +public final class JobStatusChangedListenerUtils { + /** + * Create job status changed listeners from configuration for job. + * + * @param configuration The job configuration. + * @return the job status changed listeners. + */ + public static List createJobStatusChangedListeners( + ClassLoader userClassLoader, Configuration configuration, Executor ioExecutor) { + List jobStatusChangedListeners = configuration.get(JOB_STATUS_CHANGED_LISTENERS); + if (jobStatusChangedListeners == null || jobStatusChangedListeners.isEmpty()) { + return Collections.emptyList(); + } + return jobStatusChangedListeners.stream() + .map( + fac -> { + try { + return InstantiationUtil.instantiate( + fac, + JobStatusChangedListenerFactory.class, + userClassLoader) + .createListener( + new JobStatusChangedListenerFactory.Context() { + @Override + public Configuration getConfiguration() { + return configuration; + } + + @Override + public ClassLoader getUserClassLoader() { + return userClassLoader; + } + + @Override + public Executor getIOExecutor() { + return ioExecutor; + } + }); + } catch (FlinkException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutor.java index c9edef9077771b..84eba0e1565606 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutor.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutor.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.kubernetes.KubernetesClusterClientFactory; import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; @@ -31,7 +32,7 @@ public class KubernetesSessionClusterExecutor public static final String NAME = KubernetesDeploymentTarget.SESSION.getName(); - public KubernetesSessionClusterExecutor() { - super(new KubernetesClusterClientFactory()); + public KubernetesSessionClusterExecutor(Configuration configuration) { + super(new KubernetesClusterClientFactory(), configuration); } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java index 90c6a62adf598e..196bc20103914f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java @@ -44,6 +44,6 @@ public boolean isCompatibleWith(@Nonnull final Configuration configuration) { @Override public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) { - return new KubernetesSessionClusterExecutor(); + return new KubernetesSessionClusterExecutor(configuration); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 0fb840a25a785c..54bf5a1f45081a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -27,6 +27,8 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent; +import org.apache.flink.core.execution.JobStatusChangedListener; import org.apache.flink.core.execution.JobStatusHook; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.SimpleCounter; @@ -302,6 +304,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG private final TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory; + private final List jobStatusChangedListeners; + // -------------------------------------------------------------------------------------------- // Constructors // -------------------------------------------------------------------------------------------- @@ -327,7 +331,8 @@ public DefaultExecutionGraph( ExecutionJobVertex.Factory executionJobVertexFactory, List jobStatusHooks, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, - TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory) { + TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory, + List jobStatusChangedListeners) { this.jobType = jobType; this.executionGraphId = new ExecutionGraphID(); @@ -398,6 +403,8 @@ public DefaultExecutionGraph( this.taskDeploymentDescriptorFactory = checkNotNull(taskDeploymentDescriptorFactory); + this.jobStatusChangedListeners = checkNotNull(jobStatusChangedListeners); + LOG.info( "Created execution graph {} for job {}.", executionGraphId, @@ -1164,7 +1171,7 @@ private boolean transitionState(JobStatus current, JobStatus newState, Throwable error); stateTimestamps[newState.ordinal()] = System.currentTimeMillis(); - notifyJobStatusChange(newState); + notifyJobStatusChange(current, newState, error); notifyJobStatusHooks(newState, error); return true; } else { @@ -1599,7 +1606,8 @@ public void registerJobStatusListener(JobStatusListener listener) { } } - private void notifyJobStatusChange(JobStatus newState) { + private void notifyJobStatusChange( + JobStatus oldState, JobStatus newState, @Nullable Throwable cause) { if (jobStatusListeners.size() > 0) { final long timestamp = System.currentTimeMillis(); @@ -1611,6 +1619,14 @@ private void notifyJobStatusChange(JobStatus newState) { } } } + + if (jobStatusChangedListeners.size() > 0) { + jobStatusChangedListeners.forEach( + listener -> + listener.onEvent( + new DefaultJobExecutionStatusEvent( + getJobID(), getJobName(), oldState, newState, cause))); + } } private void notifyJobStatusHooks(JobStatus newState, @Nullable Throwable cause) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java index 7e41af896df806..7ce6e6361afec1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java @@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.execution.JobStatusChangedListener; +import org.apache.flink.core.execution.JobStatusChangedListenerUtils; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -138,6 +140,10 @@ public static DefaultExecutionGraph buildGraph( throw new JobException("Could not create the TaskDeploymentDescriptorFactory.", e); } + final List jobStatusChangedListeners = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + classLoader, jobManagerConfig, ioExecutor); + // create a new execution graph, if none exists so far final DefaultExecutionGraph executionGraph = new DefaultExecutionGraph( @@ -161,7 +167,8 @@ public static DefaultExecutionGraph buildGraph( executionJobVertexFactory, jobGraph.getJobStatusHooks(), markPartitionFinishedStrategy, - taskDeploymentDescriptorFactory); + taskDeploymentDescriptorFactory, + jobStatusChangedListeners); // set the basic properties diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index fb1deadff6efa1..408bf00fd114db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.lineage.LineageGraph; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory; import org.apache.flink.streaming.api.operators.SourceOperatorFactory; @@ -134,6 +135,7 @@ public class StreamGraph implements Pipeline { private boolean dynamic; private boolean autoParallelismEnabled; + private LineageGraph lineageGraph; public StreamGraph( Configuration jobConfiguration, @@ -228,6 +230,10 @@ public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { this.timeCharacteristic = timeCharacteristic; } + public LineageGraph getLineageGraph() { + return lineageGraph; + } + public GlobalStreamExchangeMode getGlobalStreamExchangeMode() { return globalExchangeMode; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java new file mode 100644 index 00000000000000..34ff064137ce21 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.execution; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.streaming.api.lineage.LineageGraph; + +/** Default implementation for {@link JobCreatedEvent}. */ +@Internal +public class DefaultJobCreatedEvent implements JobCreatedEvent { + private final JobID jobId; + private final String jobName; + private final LineageGraph lineageGraph; + private final RuntimeExecutionMode executionMode; + + public DefaultJobCreatedEvent( + JobID jobId, + String jobName, + LineageGraph lineageGraph, + RuntimeExecutionMode executionMode) { + this.jobId = jobId; + this.jobName = jobName; + this.lineageGraph = lineageGraph; + this.executionMode = executionMode; + } + + @Override + public JobID jobId() { + return jobId; + } + + @Override + public String jobName() { + return jobName; + } + + @Override + public LineageGraph lineageGraph() { + return lineageGraph; + } + + @Override + public RuntimeExecutionMode executionMode() { + return executionMode; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java new file mode 100644 index 00000000000000..1d20dcd21080bb --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.execution; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.core.execution.JobStatusChangedEvent; +import org.apache.flink.streaming.api.lineage.LineageGraph; + +/** Basic job created event. */ +@PublicEvolving +public interface JobCreatedEvent extends JobStatusChangedEvent { + + /* Lineage for the current job. */ + LineageGraph lineageGraph(); + + /* Runtime execution mode for the job, STREAMING/BATCH/AUTOMATIC. */ + RuntimeExecutionMode executionMode(); +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java new file mode 100644 index 00000000000000..1d9d9b8f1482a7 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.execution; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent; +import org.apache.flink.core.execution.JobExecutionStatusEvent; +import org.apache.flink.core.execution.JobStatusChangedEvent; +import org.apache.flink.core.execution.JobStatusChangedListener; +import org.apache.flink.core.execution.JobStatusChangedListenerFactory; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for job status changed listener. */ +public class JobStatusChangedListenerITCase { + private static List statusChangedEvents = new ArrayList<>(); + + @Test + void testJobStatusChanged() throws Exception { + Configuration configuration = new Configuration(); + configuration.set( + JOB_STATUS_CHANGED_LISTENERS, + Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName())); + try (StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration)) { + List sourceValues = Arrays.asList("a", "b", "c"); + List resultValues = new ArrayList<>(); + try (CloseableIterator iterator = + env.fromCollection(sourceValues).executeAndCollect()) { + while (iterator.hasNext()) { + resultValues.add(iterator.next()); + } + } + assertThat(resultValues).containsExactlyInAnyOrder(sourceValues.toArray(new String[0])); + } + assertThat(statusChangedEvents.size()).isEqualTo(3); + assertThat(statusChangedEvents.get(0).jobId()) + .isEqualTo(statusChangedEvents.get(1).jobId()); + assertThat(statusChangedEvents.get(0).jobName()) + .isEqualTo(statusChangedEvents.get(1).jobName()); + + assertThat(statusChangedEvents.get(1).jobId()) + .isEqualTo(statusChangedEvents.get(2).jobId()); + assertThat(statusChangedEvents.get(1).jobName()) + .isEqualTo(statusChangedEvents.get(2).jobName()); + + statusChangedEvents.forEach( + event -> { + if (event instanceof DefaultJobExecutionStatusEvent) { + JobExecutionStatusEvent status = (JobExecutionStatusEvent) event; + assertThat( + (status.oldStatus() == JobStatus.CREATED + && status.newStatus() == JobStatus.RUNNING) + || (status.oldStatus() == JobStatus.RUNNING + && status.newStatus() + == JobStatus.FINISHED)) + .isTrue(); + } else { + DefaultJobCreatedEvent createdEvent = (DefaultJobCreatedEvent) event; + assertThat(createdEvent.executionMode()) + .isEqualTo(RuntimeExecutionMode.STREAMING); + } + }); + } + + /** Testing job status changed listener factory. */ + public static class TestingJobStatusChangedListenerFactory + implements JobStatusChangedListenerFactory { + + @Override + public JobStatusChangedListener createListener(Context context) { + return new TestingJobStatusChangedListener(); + } + } + + /** Testing job status changed listener. */ + private static class TestingJobStatusChangedListener implements JobStatusChangedListener { + + @Override + public void onEvent(JobStatusChangedEvent event) { + statusChangedEvents.add(event); + } + } +} diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java index b14618e05152f3..065772daee84d6 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.yarn.YarnClusterClientFactory; import org.apache.flink.yarn.configuration.YarnDeploymentTarget; @@ -33,7 +34,7 @@ public class YarnSessionClusterExecutor public static final String NAME = YarnDeploymentTarget.SESSION.getName(); - public YarnSessionClusterExecutor() { - super(new YarnClusterClientFactory()); + public YarnSessionClusterExecutor(Configuration configuration) { + super(new YarnClusterClientFactory(), configuration); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java index e94a230b4cd4b6..3c7cb0f9044d0b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java @@ -45,7 +45,7 @@ public boolean isCompatibleWith(@Nonnull final Configuration configuration) { @Override public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) { try { - return new YarnSessionClusterExecutor(); + return new YarnSessionClusterExecutor(configuration); } catch (NoClassDefFoundError e) { throw new IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE); }