Skip to content

Commit

Permalink
fix: Enforce single domain restriction in transformer to align with c…
Browse files Browse the repository at this point in the history
…urrent UI capabilities
  • Loading branch information
sagar-salvi-apptware committed May 13, 2024
1 parent 28d0a91 commit a039084
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 96 deletions.
83 changes: 56 additions & 27 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) |
| `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)<br/> - [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) |
| `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)<br/> - [Add Dataset datasetProperties](#add-dataset-datasetproperties) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains)<br/> - [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) |
| `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)<br/> - [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)<br/> - [Add Dataset dataProduct](#add-dataset-dataproduct)

## Extract Ownership from Tags
Expand Down Expand Up @@ -1064,6 +1064,61 @@ in both of the cases domain should be provisioned on DataHub GMS
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"]
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"]
```



## Domain Mapping Based on Tags
### Config Details

| Field | Required | Type | Default | Description |
|-----------------|----------|-------------------------|-------------|---------------------------------------------------------------------------------------------------------|
| `domain_mapping`| ✅ | Dict[str, str] | | Dataset Entity tag as key and domain urn or name as value to map with dataset as asset. |
| `semantics` | | enum | "OVERWRITE" | Whether to OVERWRITE or PATCH the entity present on DataHub GMS.|

<br/>

let’s suppose we’d like to add domain to dataset based on tag, in this case you can use `domain_mapping_based_on_tags` transformer.

The config, which we’d append to our ingestion recipe YAML, would look like this:

Here we can set domains to either urn (i.e. urn:li:domain:engineering) or simple domain name (i.e. engineering) in both of the cases domain should be provisioned on DataHub GMS

When specifying tags within the domain mapping, use the tag's simple name rather than the full tag URN.

For example, instead of using the tag URN urn:li:tag:NeedsDocumentation, you should specify just the simple tag name NeedsDocumentation in the domain mapping configuration

```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
domain_mapping:
'NeedsDocumentation': "urn:li:domain:documentation"
```


`domain_mapping_based_on_tags` can be configured in below different way

- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS
```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: OVERWRITE # OVERWRITE is default behaviour
domain_mapping:
'example1': "urn:li:domain:engineering"
'example2': "urn:li:domain:hr"
```
- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS
```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: PATCH
domain_mapping:
'example1': "urn:li:domain:engineering"
'example2': "urn:li:domain:hr"
```

## Simple Add Dataset dataProduct
### Config Details
| Field | Required | Type | Default | Description |
Expand Down Expand Up @@ -1103,32 +1158,6 @@ The config, which we’d append to our ingestion recipe YAML, would look like th
".*example2.*": "urn:li:dataProduct:second"
```

## Domain Mapping Based on Tags
### Config Details
| Field | Required | Type | Default | Description |
|----------------------|----------|--------------------|---------|-----------------------------------------------------------------------------------------------|
| `domain_mapping` | ✅ | map[string, list[string]] | | Maps tag suffixes to lists of domain URNs. Associates tagged entities with specified domains. |

### Overview

The domain_mapping_based_on_tags transformer is part of the data ingestion framework, which is used to dynamically assign domains to entities based on their tags. This module scans the tags of entities and matches their tags against predefined rules, assigning the corresponding domains if a match is found.

### Usage Example

To configure this functionality within your ingestion recipe, you can specify the transformer and its configuration in your YAML file. Here’s an example configuration that map tags to domains:

```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
domain_mapping:
rules:
'test:tag': ["hr", "urn:li:domain:finance"]
```

In this example:
- Entities tagged with `test:tag` will be associated with the domains `hr` and `urn:li:domain:finance`.

## Add Dataset dataProduct
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import List, Optional, Set, cast
from typing import Dict, List, Optional, Set, cast

from datahub.configuration.common import ConfigModel, KeyValuePattern
from datahub.configuration.common import (
TransformerSemantics,
TransformerSemanticsConfigModel,
)
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_domain import AddDatasetDomain
from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer
from datahub.metadata.schema_classes import DomainsClass, GlobalTagsClass


class DatasetTagDomainMapperConfig(ConfigModel):
domain_mapping: KeyValuePattern = KeyValuePattern.all()
class DatasetTagDomainMapperConfig(TransformerSemanticsConfigModel):
domain_mapping: Dict[str, str]


class DatasetTagDomainMapper(DatasetDomainTransformer):
Expand All @@ -30,46 +33,38 @@ def create(
def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
# Initialize or extend the existing domain aspect
# Initialize the existing domain aspect
existing_domain_aspect: DomainsClass = cast(DomainsClass, aspect)
domain_aspect = DomainsClass(
domains=existing_domain_aspect.domains if existing_domain_aspect else []
)

domain_mapping = self.config.domain_mapping
assert self.ctx.graph
global_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(entity_urn)
# Check if we have tags received in existing aspect
if global_tags:
transformer_tags = domain_mapping.rules.keys()
domain_mapping = self.config.domain_mapping
transformer_tags = domain_mapping.keys()
tags_seen: Set[str] = set()
for tag_item in global_tags.tags:
tag = tag_item.tag.split("urn:li:tag:")[-1]
if tag in transformer_tags:
tags_seen.add(tag)

if tags_seen:
domains_to_add = DatasetTagDomainMapper.get_matching_domains(
domain_mapping, tags_seen
)
domain_aspect = DomainsClass(domains=[])
domains_to_add: List[str] = []
for tag in tags_seen:
if domain_mapping.get(tag):
domains_to_add.append(domain_mapping[tag])

mapped_domains = AddDatasetDomain.get_domain_class(
self.ctx.graph, domains_to_add
)
domain_aspect.domains.extend(mapped_domains.domains)
# Try merging with server-side domains
patch_domain_aspect: Optional[
DomainsClass
] = AddDatasetDomain._merge_with_server_domains(
self.ctx.graph, entity_urn, domain_aspect
)
return cast(Optional[Aspect], patch_domain_aspect)

return cast(Optional[Aspect], domain_aspect)

@staticmethod
def get_matching_domains(
mapping: KeyValuePattern, tags_seen: Set[str]
) -> List[str]:
domains_to_add = []
for tag in tags_seen:
domains_to_add.extend(mapping.value(tag))
return domains_to_add
if self.config.semantics == TransformerSemantics.PATCH:
# Try merging with server-side domains
patch_domain_aspect: Optional[
DomainsClass
] = AddDatasetDomain._merge_with_server_domains(
self.ctx.graph, entity_urn, domain_aspect
)
return cast(Optional[Aspect], patch_domain_aspect)
return cast(Optional[Aspect], domain_aspect)
return cast(Optional[Aspect], existing_domain_aspect)
68 changes: 31 additions & 37 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3463,28 +3463,25 @@ def test_pattern_cleanup_usage_statistics_user_3(
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts


def test_domain_mapping_based_on_tags(mock_datahub_graph):
def test_domain_mapping_based_on_tags_with_valid_tags(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
gslab_domain = builder.make_domain_urn("gslab.io")
server_domain = builder.make_domain_urn("test.io")

pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())
tag_one = builder.make_tag_urn("test:tag_1")

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(
tags=[TagAssociationClass(tag=builder.make_tag_urn("Test"))]
)
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=tag_one)])

pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[gslab_domain]),
config={
"domain_mapping": {"rules": {"Test": [acryl_domain, server_domain]}},
},
aspect=models.DomainsClass(domains=[server_domain]),
config={"domain_mapping": {"test:tag_1": acryl_domain}},
pipeline_context=pipeline_context,
)

Expand All @@ -3495,10 +3492,9 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 3
assert gslab_domain in transformed_aspect.domains
assert len(transformed_aspect.domains) == 1
assert acryl_domain in transformed_aspect.domains
assert server_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains


def test_domain_mapping_based_on_tags_with_no_matching_tags(mock_datahub_graph):
Expand All @@ -3509,7 +3505,7 @@ def test_domain_mapping_based_on_tags_with_no_matching_tags(mock_datahub_graph):
pipeline_context = PipelineContext(run_id="no_match_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return tags that do not match any rule
# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=non_matching_tag)])

Expand All @@ -3519,7 +3515,7 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[server_domain]),
config={
"domain_mapping": {"rules": {"Test": [acryl_domain]}},
"domain_mapping": {"test:tag_1": acryl_domain},
},
pipeline_context=pipeline_context,
)
Expand All @@ -3538,6 +3534,7 @@ def test_domain_mapping_based_on_tags_with_empty_config(mock_datahub_graph):
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=some_tag)])

Expand All @@ -3546,42 +3543,44 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[]),
config={"domain_mapping": {"rules": {}}},
config={"domain_mapping": {}},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 0


def test_domain_mapping_based_on_tags_with_multiple_matches(mock_datahub_graph):
def test_domain_mapping_based__r_on_tags_with_multiple_tags(mock_datahub_graph):
# Two tags that match different rules in the domain mapping configuration
tag_one = builder.make_tag_urn("test:tag_1")
tag_two = builder.make_tag_urn("test:tag_2")

existing_domain = builder.make_domain_urn("existing.io")
finance = builder.make_domain_urn("finance")
hr = builder.make_domain_urn("hr")
marketing = builder.make_domain_urn("marketing")
sales = builder.make_domain_urn("sales")

pipeline_context = PipelineContext(run_id="multiple_matches_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behavior with multiple matching tags
# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(
tags=[TagAssociationClass(tag=tag_one), TagAssociationClass(tag=tag_two)]
)

# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[existing_domain])

pipeline_context.graph.get_tags = fake_get_tags # type: ignore
pipeline_context.graph.get_domain = fake_get_domain # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[]),
aspect=models.DomainsClass(domains=[existing_domain]),
config={
"domain_mapping": {
"rules": {"test:tag_1": [finance, hr], "test:tag_2": [marketing, sales]}
},
"domain_mapping": {"test:tag_1": finance, "test:tag_2": hr},
"semantics": "PATCH",
},
pipeline_context=pipeline_context,
)
Expand All @@ -3594,11 +3593,8 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)

# Expecting domains from both matched tags
expected_domains = set([finance, hr, marketing, sales])
assert set(transformed_aspect.domains) == expected_domains
assert (
len(transformed_aspect.domains) == 4
) # Ensure all expected domains are present and no duplicates
assert set(output[0].record.aspect.domains) == {existing_domain, finance, hr}
assert len(transformed_aspect.domains) == 3


def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph):
Expand All @@ -3607,6 +3603,7 @@ def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph):
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[])

Expand All @@ -3615,9 +3612,7 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[acryl_domain]),
config={
"domain_mapping": {"rules": {"Test": [server_domain]}},
},
config={"domain_mapping": {"test:tag_1": server_domain}},
pipeline_context=pipeline_context,
)

Expand All @@ -3636,6 +3631,7 @@ def test_domain_mapping_based_on_tags_with_no_tags(mock_datahub_graph):
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]:
return None

Expand All @@ -3644,9 +3640,7 @@ def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]:
output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[acryl_domain]),
config={
"domain_mapping": {"rules": {"Test": [server_domain]}},
},
config={"domain_mapping": {"test:tag_1": server_domain}},
pipeline_context=pipeline_context,
)

Expand Down

0 comments on commit a039084

Please sign in to comment.