Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/dbt): dbt column-level lineage #8991

Merged
merged 26 commits into from Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion-modules/airflow-plugin/setup.py
Expand Up @@ -14,7 +14,7 @@ def get_long_description():
return pathlib.Path(os.path.join(root, "README.md")).read_text()


_version = package_metadata["__version__"]
_version: str = package_metadata["__version__"]
_self_pin = f"=={_version}" if not _version.endswith("dev0") else ""


Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/setup.py
Expand Up @@ -305,8 +305,8 @@
"datahub-lineage-file": set(),
"datahub-business-glossary": set(),
"delta-lake": {*data_lake_profiling, *delta_lake},
"dbt": {"requests"} | aws_common,
"dbt-cloud": {"requests"},
"dbt": {"requests"} | sqlglot_lib | aws_common,
"dbt-cloud": {"requests"} | sqlglot_lib,
"druid": sql_common | {"pydruid>=0.6.2"},
"dynamodb": aws_common,
# Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws
Expand Down
Expand Up @@ -15,7 +15,7 @@
from datahub.specific.dataset import DatasetPatchBuilder


def _convert_upstream_lineage_to_patch(
def convert_upstream_lineage_to_patch(
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
Expand Down Expand Up @@ -86,16 +86,11 @@ def _merge_upstream_lineage(


def _lineage_wu_via_read_modify_write(
graph: Optional[DataHubGraph],
graph: DataHubGraph,
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
if graph is None:
raise ValueError(
"Failed to handle incremental lineage, DataHubGraph is missing. "
"Use `datahub-rest` sink OR provide `datahub-api` config in recipe. "
)
gms_aspect = graph.get_aspect(urn, UpstreamLineageClass)
if gms_aspect:
new_aspect = _merge_upstream_lineage(aspect, gms_aspect)
Expand Down Expand Up @@ -131,11 +126,16 @@ def auto_incremental_lineage(
yield wu

if lineage_aspect.fineGrainedLineages:
if graph is None:
raise ValueError(
"Failed to handle incremental lineage, DataHubGraph is missing. "
"Use `datahub-rest` sink OR provide `datahub-api` config in recipe. "
)
yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
)
elif lineage_aspect.upstreams:
yield _convert_upstream_lineage_to_patch(
yield convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
Expand Down