Skip to content

Commit

Permalink
[FLINK-33212][core] add job status changed listener for lineage
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangZhenQiu authored and Peter (ACS) Huang committed May 20, 2024
1 parent 86de622 commit ca78b3c
Show file tree
Hide file tree
Showing 28 changed files with 786 additions and 18 deletions.
81 changes: 81 additions & 0 deletions docs/content/docs/deployment/advanced/job_status_listener.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@

---
title: "Job Status Changed Listener"
nav-title: job-status-listener
nav-parent_id: advanced
nav-pos: 3
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Job status changed listener
Flink provides a pluggable interface for users to register their custom logic for handling with the job status changes in which lineage info about source/sink is provided.
This enables users to implement their own flink lineage reporter to send lineage info to third party data lineage systems for example Datahub and Openlineage.

The job status changed listeners are triggered every time status change happened for the application. The data lineage info is included in the JobCreatedEvent.

### Implement a plugin for your custom enricher

To implement a custom JobStatusChangedListener plugin, you need to:

- Add your own JobStatusChangedListener by implementing the {{< gh_link file="/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListener.java" name="JobStatusChangedListener" >}} interface.

- Add your own JobStatusChangedListenerFactory by implementing the {{< gh_link file="/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerFactory.java" name="JobStatusChangedListenerFactory" >}} interface.

- Add a service entry. Create a file `META-INF/services/org.apache.flink.core.execution.JobStatusChangedListenerFactory` which contains the class name of your job status changed listener factory class (see [Java Service Loader](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/ServiceLoader.html) docs for more details).


Then, create a jar which includes your `JobStatusChangedListener`, `JobStatusChangedListenerFactory`, `META-INF/services/` and all external dependencies.
Make a directory in `plugins/` of your Flink distribution with an arbitrary name, e.g. "job-status-changed-listener", and put the jar into this directory.
See [Flink Plugin]({{< ref "docs/deployment/filesystems/plugins" >}}) for more details.

JobStatusChangedListenerFactory example:

``` java
package org.apache.flink.test.execution;

public static class TestingJobStatusChangedListenerFactory
implements JobStatusChangedListenerFactory {

@Override
public JobStatusChangedListener createListener(Context context) {
return new TestingJobStatusChangedListener();
}
}
```

JobStatusChangedListener example:

``` java
package org.apache.flink.test.execution;

private static class TestingJobStatusChangedListener implements JobStatusChangedListener {

@Override
public void onEvent(JobStatusChangedEvent event) {
statusChangedEvents.add(event);
}
}
```

### Configuration

Flink components loads JobStatusChangedListener plugins at startup. To make sure your JobStatusChangedListeners are loaded all class names should be defined as part of [execution.job-status-changed-listeners]({{< ref "docs/deployment/config#execution.job-status-changed-listeners" >}}).
If this configuration is empty, NO enrichers will be started. Example:
```
execution.job-status-changed-listeners = org.apache.flink.test.execution.TestingJobStatusChangedListenerFactory
```
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
<td>List&lt;String&gt;</td>
<td>Custom JobListeners to be registered with the execution environment. The registered listeners cannot have constructors with arguments.</td>
</tr>
<tr>
<td><h5>execution.job-status-changed-listeners</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>When job is created or its status is changed, Flink will generate job event and notify job status changed listener.</td>
</tr>
<tr>
<td><h5>execution.program-config.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.FunctionUtils;

import org.slf4j.Logger;
Expand All @@ -41,9 +44,12 @@
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -57,6 +63,10 @@ public class EmbeddedExecutor implements PipelineExecutor {

private static final Logger LOG = LoggerFactory.getLogger(EmbeddedExecutor.class);

private final ExecutorService executorService =
Executors.newFixedThreadPool(
1, new ExecutorThreadFactory("Flink-EmbeddedClusterExecutor-IO"));

public static final String NAME = "embedded";

private final Collection<JobID> submittedJobIds;
Expand All @@ -65,6 +75,8 @@ public class EmbeddedExecutor implements PipelineExecutor {

private final EmbeddedJobClientCreator jobClientCreator;

private final List<JobStatusChangedListener> jobStatusChangedListeners;

/**
* Creates a {@link EmbeddedExecutor}.
*
Expand All @@ -73,14 +85,22 @@ public class EmbeddedExecutor implements PipelineExecutor {
* caller.
* @param dispatcherGateway the dispatcher of the cluster which is going to be used to submit
* jobs.
* @param configuration the flink application configuration
* @param jobClientCreator the job client creator
*/
public EmbeddedExecutor(
final Collection<JobID> submittedJobIds,
final DispatcherGateway dispatcherGateway,
final Configuration configuration,
final EmbeddedJobClientCreator jobClientCreator) {
this.submittedJobIds = checkNotNull(submittedJobIds);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
this.jobClientCreator = checkNotNull(jobClientCreator);
this.jobStatusChangedListeners =
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
Thread.currentThread().getContextClassLoader(),
configuration,
executorService);
}

@Override
Expand Down Expand Up @@ -153,7 +173,14 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture(
return jobId;
}))
.thenApplyAsync(
jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader));
jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader))
.whenCompleteAsync(
(jobClient, throwable) -> {
if (throwable == null) {
PipelineExecutorUtils.notifyJobStatusListeners(
pipeline, jobGraph, jobStatusChangedListeners);
}
});
}

private static CompletableFuture<JobID> submitJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
return new EmbeddedExecutor(
submittedJobIds,
dispatcherGateway,
configuration,
(jobId, userCodeClassloader) -> {
final Time timeout =
Time.milliseconds(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
return new EmbeddedExecutor(
submittedJobIds,
dispatcherGateway,
configuration,
(jobId, userCodeClassloader) -> new WebSubmissionJobClient(jobId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,22 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.FunctionUtils;

import javax.annotation.Nonnull;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -56,10 +62,23 @@ public class AbstractSessionClusterExecutor<
ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>
implements CacheSupportedPipelineExecutor {

private final ExecutorService executorService =
Executors.newFixedThreadPool(
1, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));

private final ClientFactory clusterClientFactory;
private final Configuration configuration;
private final List<JobStatusChangedListener> jobStatusChangedListeners;

public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
public AbstractSessionClusterExecutor(
@Nonnull final ClientFactory clusterClientFactory, Configuration configuration) {
this.clusterClientFactory = checkNotNull(clusterClientFactory);
this.configuration = configuration;
this.jobStatusChangedListeners =
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
Thread.currentThread().getContextClassLoader(),
configuration,
executorService);
}

@Override
Expand Down Expand Up @@ -97,7 +116,14 @@ public CompletableFuture<JobClient> execute(
clusterClientProvider,
jobID,
userCodeClassloader))
.whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());
.whenCompleteAsync(
(jobClient, throwable) -> {
if (throwable == null) {
PipelineExecutorUtils.notifyJobStatusListeners(
pipeline, jobGraph, jobStatusChangedListeners);
}
clusterClient.close();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -41,11 +47,14 @@
/** An {@link PipelineExecutor} for executing a {@link Pipeline} locally. */
@Internal
public class LocalExecutor implements PipelineExecutor {
private final ExecutorService executorService =
Executors.newFixedThreadPool(1, new ExecutorThreadFactory("Flink-LocalExecutor-IO"));

public static final String NAME = "local";

private final Configuration configuration;
private final Function<MiniClusterConfiguration, MiniCluster> miniClusterFactory;
private final List<JobStatusChangedListener> jobStatusChangedListeners;

public static LocalExecutor create(Configuration configuration) {
return new LocalExecutor(configuration, MiniCluster::new);
Expand All @@ -62,6 +71,11 @@ private LocalExecutor(
Function<MiniClusterConfiguration, MiniCluster> miniClusterFactory) {
this.configuration = configuration;
this.miniClusterFactory = miniClusterFactory;
this.jobStatusChangedListeners =
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
Thread.currentThread().getContextClassLoader(),
configuration,
executorService);
}

@Override
Expand All @@ -81,7 +95,14 @@ public CompletableFuture<JobClient> execute(
final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);

return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
.submitJob(jobGraph, userCodeClassloader);
.submitJob(jobGraph, userCodeClassloader)
.whenComplete(
(ignored, throwable) -> {
if (throwable == null) {
PipelineExecutorUtils.notifyJobStatusListeners(
pipeline, jobGraph, jobStatusChangedListeners);
}
});
}

private JobGraph getJobGraph(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,33 @@
package org.apache.flink.client.deployment.executors;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.net.MalformedURLException;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Utility class with method related to job execution. */
public class PipelineExecutorUtils {
private static final Logger LOG = LoggerFactory.getLogger(PipelineExecutorUtils.class);

/**
* Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
Expand Down Expand Up @@ -80,4 +90,35 @@ public static JobGraph getJobGraph(

return jobGraph;
}

/**
* Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
*
* @param pipeline the pipeline that contains lineage graph information.
* @param jobGraph jobGraph that contains job basic info
* @param listeners the list of job status changed listeners
*/
public static void notifyJobStatusListeners(
@Nonnull final Pipeline pipeline,
@Nonnull final JobGraph jobGraph,
List<JobStatusChangedListener> listeners) {
RuntimeExecutionMode executionMode =
jobGraph.getJobConfiguration().get(ExecutionOptions.RUNTIME_MODE);
listeners.forEach(
listener -> {
try {
listener.onEvent(
new DefaultJobCreatedEvent(
jobGraph.getJobID(),
jobGraph.getName(),
((StreamGraph) pipeline).getLineageGraph(),
executionMode));
} catch (Throwable e) {
LOG.error(
"Fail to notify job status changed listener {}",
listener.getClass().getName(),
e);
}
});
}
}

0 comments on commit ca78b3c

Please sign in to comment.