Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33211][table] support flink table lineage #24618

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

HuangZhenQiu
Copy link
Contributor

What is the purpose of the change

  1. Add Table Lineage Vertex into transformation in planner. The final LineageGraph is generated from transformation and put into StreamGraph. The lineage graph will be published to Lineage Listener in follow up PR.
  2. Deprecated table source and sink are not considered as no enough info can be used for name and namespace for lineage dataset.

Brief change log

  • add table lineage interface and default implementations
  • create lineage vertex and add them to transformation in the phase of physical plan to transformation conversion.

Verifying this change

  1. Add TableLineageGraphTest for both stream and batch.
  2. Added LineageGraph verification in TransformationsTest for legacy sources.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable )

@HuangZhenQiu
Copy link
Contributor Author

@flinkbot run azure

@HuangZhenQiu HuangZhenQiu force-pushed the support-table-lineage branch 3 times, most recently from 34f3667 to 3a01cfd Compare April 3, 2024 20:18
@flinkbot
Copy link
Collaborator

flinkbot commented Apr 4, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@HuangZhenQiu
Copy link
Contributor Author

@flinkbot run azure

Copy link
Contributor

@PatrickRen PatrickRen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HuangZhenQiu Thanks for the contribution! I left some comments.

Also I found a lot of one-line changes removing blank line in file header. Could you split them to another hotfix commit, or directly revert them as they are not quite necessary?

@@ -34,6 +35,7 @@
public abstract class PhysicalTransformation<T> extends Transformation<T> {

private boolean supportsConcurrentExecutionAttempts = true;
private LineageVertex lineageVertex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like only source and sink transformations have lineage vertex. What about we only add it to source / sink transformations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Added an TransformationWithLineage class for this purpose.


public TableLineageDatasetImpl(ContextResolvedTable contextResolvedTable) {
this.name = contextResolvedTable.getIdentifier().asSummaryString();
this.namespace = inferNamespace(contextResolvedTable.getTable()).orElse("");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the implementation here matches the definition on the interface. From Javadoc of LineageDataset:

/* Unique name for this dataset's storage, for example, url for jdbc connector and location for lakehouse connector. */

Let's take JDBC connector as an example. My assumption is that the namespace should describe the URL of the database, or at least some identifier that can tell difference between difference DB instances. Here the implementation only writes jdbc as the namespace.

@@ -90,6 +90,7 @@ protected Transformation<RowData> createConversionTransformationIfNeeded(
final RowType outputType = (RowType) getOutputType();
final Transformation<RowData> transformation;
final int[] fieldIndexes = computeIndexMapping(true);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added by mistake I assume

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@@ -123,6 +124,7 @@ public class StreamGraph implements Pipeline {
private CheckpointStorage checkpointStorage;
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
private InternalTimeServiceManager.Provider timerServiceProvider;
private LineageGraph lineageGraph;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only usage of this field is for tests. Is it possible not to introduce it in StreamGraph?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline, we will keep it here.

import java.util.Map;

/** Default implementation for DatasetSchemaFacet. */
public class TableDataSetSchemaFacet implements DatasetSchemaFacet {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataSet -> Dataset

import java.util.Map;

/** Default implementation for DatasetSchemaFacet. */
public class TableDataSetSchemaFacet implements DatasetSchemaFacet {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we don't need facets for table schema and table config. They are already included in TableLineageDataset#table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. It is just for exposing the data in a structured way. After the implementation, I feel we probably don't need to expose CatalogContext and CatalogBaseTable to users.

@HuangZhenQiu HuangZhenQiu force-pushed the support-table-lineage branch 2 times, most recently from e5f8a17 to 5f29a04 Compare April 29, 2024 04:32
@HuangZhenQiu
Copy link
Contributor Author

@PatrickRen
Thanks for reviewing the RP. For the testing purpose, I only added lineage provider implementation for values related source functions and input format. I will add lineage provider for Hive in a separate PR.

@HuangZhenQiu
Copy link
Contributor Author

@davidradl
Thanks for reviewing this PR. This PR is mainly to handle with source/sink level lineage, column level lineage will be need a further discussion in community. Resolved most of your comments.

@HuangZhenQiu HuangZhenQiu force-pushed the support-table-lineage branch 2 times, most recently from 47ec379 to 81da89d Compare May 6, 2024 17:46
@HuangZhenQiu
Copy link
Contributor Author

@PatrickRen
I have removed schema facet and config facets, given these info are already provided by CatalogBaseTable. It greatly reduced the size of the PR. Would you please take one more round of review?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants