Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): fully support MCPs in urn_iter primitive #9157

Merged
merged 2 commits into from Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 9 additions & 7 deletions metadata-ingestion/src/datahub/utilities/urns/urn_iter.py
Expand Up @@ -117,17 +117,19 @@ def _modify_at_path(
if isinstance(path[0], int):
assert isinstance(model, list)
model[path[0]] = new_value
elif isinstance(model, MetadataChangeProposalWrapper):
setattr(model, path[0], new_value)
else:
assert isinstance(model, DictWrapper)
elif isinstance(model, DictWrapper):
model._inner_dict[path[0]] = new_value
else:
# isinstance(model, MetadataChangeProposalWrapper)
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
setattr(model, path[0], new_value)
elif isinstance(path[0], int):
assert isinstance(model, list)
return _modify_at_path(model[path[0]], path[1:], new_value)
_modify_at_path(model[path[0]], path[1:], new_value)
elif isinstance(model, DictWrapper):
_modify_at_path(model._inner_dict[path[0]], path[1:], new_value)
else:
assert isinstance(model, DictWrapper)
return _modify_at_path(model._inner_dict[path[0]], path[1:], new_value)
# isinstance(model, MetadataChangeProposalWrapper)
_modify_at_path(getattr(model, path[0]), path[1:], new_value)


def _lowercase_dataset_urn(dataset_urn: str) -> str:
Expand Down
21 changes: 15 additions & 6 deletions metadata-ingestion/tests/unit/serde/test_urn_iterator.py
@@ -1,4 +1,5 @@
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
FineGrainedLineage,
Expand All @@ -10,11 +11,11 @@
from datahub.utilities.urns.urn_iter import list_urns_with_path, lowercase_dataset_urns


def _datasetUrn(tbl):
def _datasetUrn(tbl: str) -> str:
return builder.make_dataset_urn("bigquery", tbl, "PROD")


def _fldUrn(tbl, fld):
def _fldUrn(tbl: str, fld: str) -> str:
return builder.make_schema_field_urn(_datasetUrn(tbl), fld)


Expand Down Expand Up @@ -114,8 +115,10 @@ def test_upstream_lineage_urn_iterator():
]


def _make_test_lineage_obj(upstream: str, downstream: str) -> UpstreamLineage:
return UpstreamLineage(
def _make_test_lineage_obj(
table: str, upstream: str, downstream: str
) -> MetadataChangeProposalWrapper:
lineage = UpstreamLineage(
upstreams=[
Upstream(
dataset=_datasetUrn(upstream),
Expand All @@ -132,11 +135,17 @@ def _make_test_lineage_obj(upstream: str, downstream: str) -> UpstreamLineage:
],
)

return MetadataChangeProposalWrapper(entityUrn=_datasetUrn(table), aspect=lineage)


def test_dataset_urn_lowercase_transformer():
original = _make_test_lineage_obj("upstreamTable", "downstreamTable")
original = _make_test_lineage_obj(
"mainTableName", "upstreamTable", "downstreamTable"
)

expected = _make_test_lineage_obj("upstreamtable", "downstreamtable")
expected = _make_test_lineage_obj(
"maintablename", "upstreamtable", "downstreamtable"
)

assert original != expected # sanity check

Expand Down