Skip to content

Commit

Permalink
fix(transformers): pattern add domain transformer - enable replace_ex…
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored and Oleg Ruban committed Feb 28, 2023
1 parent 53bcfa5 commit 01cb6eb
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 1 deletion.
Expand Up @@ -155,6 +155,7 @@ def resolve_domain(domain_urn: str) -> DomainsClass:
generic_config = AddDatasetDomainSemanticsConfig(
get_domains_to_add=resolve_domain,
semantics=config.semantics,
replace_existing=config.replace_existing,
)
super().__init__(generic_config, ctx)

Expand Down
230 changes: 229 additions & 1 deletion metadata-ingestion/tests/unit/test_transform_dataset.py
Expand Up @@ -56,7 +56,10 @@
BaseTransformer,
SingleAspectTransformer,
)
from datahub.ingestion.transformer.dataset_domain import SimpleAddDatasetDomain
from datahub.ingestion.transformer.dataset_domain import (
PatternAddDatasetDomain,
SimpleAddDatasetDomain,
)
from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer
from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus
from datahub.ingestion.transformer.remove_dataset_ownership import (
Expand Down Expand Up @@ -1658,6 +1661,16 @@ def run_dataset_transformer_pipeline(
return outputs


def test_simple_add_dataset_domain_aspect_name(mock_datahub_graph):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

transformer = SimpleAddDatasetDomain.create({"domains": []}, pipeline_context)
assert transformer.aspect_name() == models.DomainsClass.ASPECT_NAME


def test_simple_add_dataset_domain(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
gslab_domain = builder.make_domain_urn("gslab.io")
Expand Down Expand Up @@ -1822,6 +1835,221 @@ def fake_get_domain(entity_urn: str) -> models.DomainsClass:
assert server_domain in transformed_aspect.domains


def test_pattern_add_dataset_domain_aspect_name(mock_datahub_graph):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

transformer = PatternAddDatasetDomain.create(
{"domain_pattern": {"rules": {}}}, pipeline_context
)
assert transformer.aspect_name() == models.DomainsClass.ASPECT_NAME


def test_pattern_add_dataset_domain_match(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
gslab_domain = builder.make_domain_urn("gslab.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*"

pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[gslab_domain]),
config={
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 2
assert gslab_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains


def test_pattern_add_dataset_domain_no_match(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
gslab_domain = builder.make_domain_urn("gslab.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:invalid,.*"

pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[gslab_domain]),
config={
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert gslab_domain in transformed_aspect.domains
assert acryl_domain not in transformed_aspect.domains


def test_pattern_add_dataset_domain_replace_existing_match(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
gslab_domain = builder.make_domain_urn("gslab.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*"

pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[gslab_domain]),
config={
"replace_existing": True,
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert gslab_domain not in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains


def test_pattern_add_dataset_domain_replace_existing_no_match(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
gslab_domain = builder.make_domain_urn("gslab.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:invalid,.*"

pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[gslab_domain]),
config={
"replace_existing": True,
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 0


def test_pattern_add_dataset_domain_semantics_overwrite(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
gslab_domain = builder.make_domain_urn("gslab.io")
server_domain = builder.make_domain_urn("test.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*"

pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[server_domain])

pipeline_context.graph.get_domain = fake_get_domain # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[gslab_domain]),
config={
"semantics": TransformerSemantics.OVERWRITE,
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 2
assert gslab_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains


def test_pattern_add_dataset_domain_semantics_patch(
pytestconfig, tmp_path, mock_time, mock_datahub_graph
):
acryl_domain = builder.make_domain_urn("acryl.io")
gslab_domain = builder.make_domain_urn("gslab.io")
server_domain = builder.make_domain_urn("test.io")
pattern = "urn:li:dataset:\\(urn:li:dataPlatform:bigquery,.*"

pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[server_domain])

pipeline_context.graph.get_domain = fake_get_domain # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=PatternAddDatasetDomain,
aspect=models.DomainsClass(domains=[gslab_domain]),
config={
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"domain_pattern": {"rules": {pattern: [acryl_domain]}},
},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 3
assert gslab_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains
assert server_domain in transformed_aspect.domains


def test_simple_dataset_ownership_transformer_semantics_patch(mock_datahub_graph):
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())
Expand Down

0 comments on commit 01cb6eb

Please sign in to comment.