From 9cf48228745cc7fed46a06705d0ce7d1e2672dea Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Wed, 8 May 2024 17:48:39 +0530 Subject: [PATCH 1/3] fix(ingestion/transformer): Add dataset domains based on tags using transformer --- .../docs/transformer/dataset_transformer.md | 26 ++++ metadata-ingestion/setup.py | 1 + .../dataset_domain_based_on_tags.py | 82 ++++++++++ .../tests/unit/test_transform_dataset.py | 141 ++++++++++++++++++ 4 files changed, 250 insertions(+) create mode 100644 metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 217d65ed05d6a..64c2410be7dc3 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -1103,6 +1103,32 @@ The config, which we’d append to our ingestion recipe YAML, would look like th ".*example2.*": "urn:li:dataProduct:second" ``` +## Domain Mapping Based on Tags +### Config Details +| Field | Required | Type | Default | Description | +|----------------------|----------|--------------------|---------|-----------------------------------------------------------------------------------------------| +| `domain_mapping` | ✅ | map[string, list] | | Maps tag suffixes to lists of domain URNs. Associates tagged entities with specified domains. | + +### Overview + +The `domain_mapping_based_on_tags` transformer is part of the data ingestion framework, which is used to dynamically assign domains to entities based on their tags. This module scans the tags of entities and matches their suffixes against predefined rules, assigning the corresponding domains if a match is found. + +### Usage Example + +To configure this functionality within your ingestion recipe, you can specify the transformer and its configuration in your YAML file. Here’s an example configuration: + +```yaml +transformers: + - type: "domain_mapping_based_on_tags" + config: + domain_mapping: + rules: + 'test:tag': ["hr", "urn:li:domain:finance"] +``` + +In this example: +- Entities tagged with `urn:li:tag:dbt:test:tag` or any tag that ends with `test:tag` will be associated with the domains `hr` and `urn:li:domain:finance`. + ## Add Dataset dataProduct ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 668845e7764c3..9d35b9b8cadf5 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -708,6 +708,7 @@ "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct", "replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl", "pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser", + "domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py new file mode 100644 index 0000000000000..47fbbcac3e5cf --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py @@ -0,0 +1,82 @@ +from typing import List, Optional, Set, cast + +from datahub.configuration.common import ConfigModel, KeyValuePattern +from datahub.emitter.mce_builder import Aspect +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.transformer.dataset_domain import AddDatasetDomain +from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer +from datahub.metadata.schema_classes import DomainsClass, GlobalTagsClass + + +class DatasetTagDomainMapperConfig(ConfigModel): + domain_mapping: KeyValuePattern = KeyValuePattern.all() + + +class DatasetTagDomainMapper(DatasetDomainTransformer): + """A transformer that appends a predefined set of domains to each dataset that includes specific tags defined in the transformer.""" + + def __init__(self, config: DatasetTagDomainMapperConfig, ctx: PipelineContext): + super().__init__() + self.ctx: PipelineContext = ctx + self.config: DatasetTagDomainMapperConfig = config + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "DatasetTagDomainMapper": + config = DatasetTagDomainMapperConfig.parse_obj(config_dict) + return cls(config, ctx) + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] + ) -> Optional[Aspect]: + domain_mapping = self.config.domain_mapping + assert self.ctx.graph + global_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(entity_urn) + # Check if we have tags received existing aspect + if global_tags is None: + return None + + # Initialize or extend the existing domain aspect + existing_domain_aspect: DomainsClass = cast(DomainsClass, aspect) + domain_aspect = DomainsClass( + domains=existing_domain_aspect.domains if existing_domain_aspect else [] + ) + transformer_tags = domain_mapping.rules.keys() + # Filter tags that match the transformer tags and remove duplicates + tags_seen = { + tag_item.tag + for tag_item in global_tags.tags + if any(tag_item.tag.endswith(key) for key in transformer_tags) + } + + if tags_seen: + domains_to_add = DatasetTagDomainMapper.get_matching_domains( + domain_mapping, tags_seen + ) + mapped_domains = AddDatasetDomain.get_domain_class( + self.ctx.graph, domains_to_add + ) + domain_aspect.domains.extend(mapped_domains.domains) + # Try merging with server-side domains + patch_domain_aspect: Optional[ + DomainsClass + ] = AddDatasetDomain._merge_with_server_domains( + self.ctx.graph, entity_urn, domain_aspect + ) + return cast(Optional[Aspect], patch_domain_aspect) + + return cast(Optional[Aspect], domain_aspect) + + @staticmethod + def get_matching_domains( + mapping: KeyValuePattern, tags_seen: Set[str] + ) -> List[str]: + domains_to_add = [] + # Iterate over each seen tag + for tag in tags_seen: + # Check each key in the domain mapping rules to see if the tag ends with this key + for key, domains in mapping.rules.items(): + if tag.endswith(key): + domains_to_add.extend(domains) + return domains_to_add diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 7e01dd8909568..84c6aa9fb2444 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -67,6 +67,9 @@ PatternAddDatasetDomain, SimpleAddDatasetDomain, ) +from datahub.ingestion.transformer.dataset_domain_based_on_tags import ( + DatasetTagDomainMapper, +) from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer from datahub.ingestion.transformer.extract_dataset_tags import ExtractDatasetTags from datahub.ingestion.transformer.extract_ownership_from_tags import ( @@ -3458,3 +3461,141 @@ def test_pattern_cleanup_usage_statistics_user_3( assert output[0].record.aspect assert len(output[0].record.aspect.userCounts) == 2 assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts + + +def test_domain_mapping_based_on_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + gslab_domain = builder.make_domain_urn("gslab.io") + server_domain = builder.make_domain_urn("test.io") + + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass( + tags=[TagAssociationClass(tag=builder.make_tag_urn("Test"))] + ) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[gslab_domain]), + config={ + "domain_mapping": {"rules": {"Test": [acryl_domain, server_domain]}}, + }, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0] is not None + assert output[0].record is not None + assert isinstance(output[0].record, MetadataChangeProposalWrapper) + assert output[0].record.aspect is not None + assert isinstance(output[0].record.aspect, models.DomainsClass) + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 3 + assert gslab_domain in transformed_aspect.domains + assert acryl_domain in transformed_aspect.domains + assert server_domain in transformed_aspect.domains + + +def test_domain_mapping_based_on_tags_with_no_matching_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + non_matching_tag = builder.make_tag_urn("nonMatching") + + pipeline_context = PipelineContext(run_id="no_match_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return tags that do not match any rule + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[TagAssociationClass(tag=non_matching_tag)]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[server_domain]), + config={ + "domain_mapping": {"rules": {"Test": [acryl_domain]}}, + }, + pipeline_context=pipeline_context, + ) + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 1 + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain not in transformed_aspect.domains + assert server_domain in transformed_aspect.domains + + +def test_domain_mapping_based_on_tags_with_empty_config(mock_datahub_graph): + some_tag = builder.make_tag_urn("someTag") + + pipeline_context = PipelineContext(run_id="empty_config_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[TagAssociationClass(tag=some_tag)]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[]), + config={"domain_mapping": {"rules": {}}}, + pipeline_context=pipeline_context, + ) + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 0 + + +def test_domain_mapping_based_on_tags_with_multiple_matches(mock_datahub_graph): + # Two tags that match different rules in the domain mapping configuration + tag_one = builder.make_tag_urn("test:tag_1") + tag_two = builder.make_tag_urn("test:tag_2") + + finance = builder.make_domain_urn("finance") + hr = builder.make_domain_urn("hr") + marketing = builder.make_domain_urn("marketing") + sales = builder.make_domain_urn("sales") + + pipeline_context = PipelineContext(run_id="multiple_matches_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behavior with multiple matching tags + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass( + tags=[TagAssociationClass(tag=tag_one), TagAssociationClass(tag=tag_two)] + ) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[]), + config={ + "domain_mapping": { + "rules": {"test:tag_1": [finance, hr], "test:tag_2": [marketing, sales]} + }, + }, + pipeline_context=pipeline_context, + ) + + # Assertions to verify the expected outcome + assert len(output) == 2 + assert output[0].record is not None + assert output[0].record.aspect is not None + assert isinstance(output[0].record.aspect, models.DomainsClass) + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + + # Expecting domains from both matched tags + expected_domains = set([finance, hr, marketing, sales]) + assert set(transformed_aspect.domains) == expected_domains + assert ( + len(transformed_aspect.domains) == 4 + ) # Ensure all expected domains are present and no duplicates From 28d0a914bfa6e0e084bd83d29fadc718e382a5ed Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Thu, 9 May 2024 23:37:47 +0530 Subject: [PATCH 2/3] fix: Fixed PR Review Comments --- .../docs/transformer/dataset_transformer.md | 8 +-- .../dataset_domain_based_on_tags.py | 61 ++++++++----------- .../tests/unit/test_transform_dataset.py | 58 ++++++++++++++++++ 3 files changed, 89 insertions(+), 38 deletions(-) diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 64c2410be7dc3..ff7cdfa05386f 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -1107,15 +1107,15 @@ The config, which we’d append to our ingestion recipe YAML, would look like th ### Config Details | Field | Required | Type | Default | Description | |----------------------|----------|--------------------|---------|-----------------------------------------------------------------------------------------------| -| `domain_mapping` | ✅ | map[string, list] | | Maps tag suffixes to lists of domain URNs. Associates tagged entities with specified domains. | +| `domain_mapping` | ✅ | map[string, list[string]] | | Maps tag suffixes to lists of domain URNs. Associates tagged entities with specified domains. | ### Overview -The `domain_mapping_based_on_tags` transformer is part of the data ingestion framework, which is used to dynamically assign domains to entities based on their tags. This module scans the tags of entities and matches their suffixes against predefined rules, assigning the corresponding domains if a match is found. +The domain_mapping_based_on_tags transformer is part of the data ingestion framework, which is used to dynamically assign domains to entities based on their tags. This module scans the tags of entities and matches their tags against predefined rules, assigning the corresponding domains if a match is found. ### Usage Example -To configure this functionality within your ingestion recipe, you can specify the transformer and its configuration in your YAML file. Here’s an example configuration: +To configure this functionality within your ingestion recipe, you can specify the transformer and its configuration in your YAML file. Here’s an example configuration that map tags to domains: ```yaml transformers: @@ -1127,7 +1127,7 @@ transformers: ``` In this example: -- Entities tagged with `urn:li:tag:dbt:test:tag` or any tag that ends with `test:tag` will be associated with the domains `hr` and `urn:li:domain:finance`. +- Entities tagged with `test:tag` will be associated with the domains `hr` and `urn:li:domain:finance`. ## Add Dataset dataProduct ### Config Details diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py index 47fbbcac3e5cf..9d651510ee1f5 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py @@ -30,41 +30,38 @@ def create( def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: - domain_mapping = self.config.domain_mapping - assert self.ctx.graph - global_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(entity_urn) - # Check if we have tags received existing aspect - if global_tags is None: - return None - # Initialize or extend the existing domain aspect existing_domain_aspect: DomainsClass = cast(DomainsClass, aspect) domain_aspect = DomainsClass( domains=existing_domain_aspect.domains if existing_domain_aspect else [] ) - transformer_tags = domain_mapping.rules.keys() - # Filter tags that match the transformer tags and remove duplicates - tags_seen = { - tag_item.tag - for tag_item in global_tags.tags - if any(tag_item.tag.endswith(key) for key in transformer_tags) - } - if tags_seen: - domains_to_add = DatasetTagDomainMapper.get_matching_domains( - domain_mapping, tags_seen - ) - mapped_domains = AddDatasetDomain.get_domain_class( - self.ctx.graph, domains_to_add - ) - domain_aspect.domains.extend(mapped_domains.domains) - # Try merging with server-side domains - patch_domain_aspect: Optional[ - DomainsClass - ] = AddDatasetDomain._merge_with_server_domains( - self.ctx.graph, entity_urn, domain_aspect - ) - return cast(Optional[Aspect], patch_domain_aspect) + domain_mapping = self.config.domain_mapping + assert self.ctx.graph + global_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(entity_urn) + # Check if we have tags received in existing aspect + if global_tags: + transformer_tags = domain_mapping.rules.keys() + tags_seen: Set[str] = set() + for tag_item in global_tags.tags: + tag = tag_item.tag.split("urn:li:tag:")[-1] + if tag in transformer_tags: + tags_seen.add(tag) + if tags_seen: + domains_to_add = DatasetTagDomainMapper.get_matching_domains( + domain_mapping, tags_seen + ) + mapped_domains = AddDatasetDomain.get_domain_class( + self.ctx.graph, domains_to_add + ) + domain_aspect.domains.extend(mapped_domains.domains) + # Try merging with server-side domains + patch_domain_aspect: Optional[ + DomainsClass + ] = AddDatasetDomain._merge_with_server_domains( + self.ctx.graph, entity_urn, domain_aspect + ) + return cast(Optional[Aspect], patch_domain_aspect) return cast(Optional[Aspect], domain_aspect) @@ -73,10 +70,6 @@ def get_matching_domains( mapping: KeyValuePattern, tags_seen: Set[str] ) -> List[str]: domains_to_add = [] - # Iterate over each seen tag for tag in tags_seen: - # Check each key in the domain mapping rules to see if the tag ends with this key - for key, domains in mapping.rules.items(): - if tag.endswith(key): - domains_to_add.extend(domains) + domains_to_add.extend(mapping.value(tag)) return domains_to_add diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 84c6aa9fb2444..82e93f750f1d9 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -3599,3 +3599,61 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: assert ( len(transformed_aspect.domains) == 4 ) # Ensure all expected domains are present and no duplicates + + +def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + pipeline_context = PipelineContext(run_id="empty_config_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[acryl_domain]), + config={ + "domain_mapping": {"rules": {"Test": [server_domain]}}, + }, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 1 + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain in transformed_aspect.domains + assert server_domain not in transformed_aspect.domains + + +def test_domain_mapping_based_on_tags_with_no_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + pipeline_context = PipelineContext(run_id="empty_config_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]: + return None + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[acryl_domain]), + config={ + "domain_mapping": {"rules": {"Test": [server_domain]}}, + }, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 1 + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain in transformed_aspect.domains + assert server_domain not in transformed_aspect.domains From a039084c42192d924972ffc6e3b0ce76d72a4987 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Mon, 13 May 2024 11:42:02 +0530 Subject: [PATCH 3/3] fix: Enforce single domain restriction in transformer to align with current UI capabilities --- .../docs/transformer/dataset_transformer.md | 83 +++++++++++++------ .../dataset_domain_based_on_tags.py | 59 ++++++------- .../tests/unit/test_transform_dataset.py | 68 +++++++-------- 3 files changed, 114 insertions(+), 96 deletions(-) diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index ff7cdfa05386f..64d1438cfcc73 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -13,7 +13,7 @@ The below table shows transformer which can transform aspects of entity [Dataset | `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)
- [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) | | `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)
- [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) | | `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)
- [Add Dataset datasetProperties](#add-dataset-datasetproperties) | -| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)
- [Pattern Add Dataset domains](#pattern-add-dataset-domains) | +| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)
- [Pattern Add Dataset domains](#pattern-add-dataset-domains)
- [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) | | `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)
- [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)
- [Add Dataset dataProduct](#add-dataset-dataproduct) ## Extract Ownership from Tags @@ -1064,6 +1064,61 @@ in both of the cases domain should be provisioned on DataHub GMS 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"] 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"] ``` + + + +## Domain Mapping Based on Tags +### Config Details + +| Field | Required | Type | Default | Description | +|-----------------|----------|-------------------------|-------------|---------------------------------------------------------------------------------------------------------| +| `domain_mapping`| ✅ | Dict[str, str] | | Dataset Entity tag as key and domain urn or name as value to map with dataset as asset. | +| `semantics` | | enum | "OVERWRITE" | Whether to OVERWRITE or PATCH the entity present on DataHub GMS.| + +
+ +let’s suppose we’d like to add domain to dataset based on tag, in this case you can use `domain_mapping_based_on_tags` transformer. + +The config, which we’d append to our ingestion recipe YAML, would look like this: + +Here we can set domains to either urn (i.e. urn:li:domain:engineering) or simple domain name (i.e. engineering) in both of the cases domain should be provisioned on DataHub GMS + +When specifying tags within the domain mapping, use the tag's simple name rather than the full tag URN. + +For example, instead of using the tag URN urn:li:tag:NeedsDocumentation, you should specify just the simple tag name NeedsDocumentation in the domain mapping configuration + +```yaml +transformers: + - type: "domain_mapping_based_on_tags" + config: + domain_mapping: + 'NeedsDocumentation': "urn:li:domain:documentation" +``` + + +`domain_mapping_based_on_tags` can be configured in below different way + +- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS +```yaml + transformers: + - type: "domain_mapping_based_on_tags" + config: + semantics: OVERWRITE # OVERWRITE is default behaviour + domain_mapping: + 'example1': "urn:li:domain:engineering" + 'example2': "urn:li:domain:hr" + ``` +- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS +```yaml + transformers: + - type: "domain_mapping_based_on_tags" + config: + semantics: PATCH + domain_mapping: + 'example1': "urn:li:domain:engineering" + 'example2': "urn:li:domain:hr" + ``` + ## Simple Add Dataset dataProduct ### Config Details | Field | Required | Type | Default | Description | @@ -1103,32 +1158,6 @@ The config, which we’d append to our ingestion recipe YAML, would look like th ".*example2.*": "urn:li:dataProduct:second" ``` -## Domain Mapping Based on Tags -### Config Details -| Field | Required | Type | Default | Description | -|----------------------|----------|--------------------|---------|-----------------------------------------------------------------------------------------------| -| `domain_mapping` | ✅ | map[string, list[string]] | | Maps tag suffixes to lists of domain URNs. Associates tagged entities with specified domains. | - -### Overview - -The domain_mapping_based_on_tags transformer is part of the data ingestion framework, which is used to dynamically assign domains to entities based on their tags. This module scans the tags of entities and matches their tags against predefined rules, assigning the corresponding domains if a match is found. - -### Usage Example - -To configure this functionality within your ingestion recipe, you can specify the transformer and its configuration in your YAML file. Here’s an example configuration that map tags to domains: - -```yaml -transformers: - - type: "domain_mapping_based_on_tags" - config: - domain_mapping: - rules: - 'test:tag': ["hr", "urn:li:domain:finance"] -``` - -In this example: -- Entities tagged with `test:tag` will be associated with the domains `hr` and `urn:li:domain:finance`. - ## Add Dataset dataProduct ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py index 9d651510ee1f5..7be8069e1b085 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py @@ -1,6 +1,9 @@ -from typing import List, Optional, Set, cast +from typing import Dict, List, Optional, Set, cast -from datahub.configuration.common import ConfigModel, KeyValuePattern +from datahub.configuration.common import ( + TransformerSemantics, + TransformerSemanticsConfigModel, +) from datahub.emitter.mce_builder import Aspect from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.transformer.dataset_domain import AddDatasetDomain @@ -8,8 +11,8 @@ from datahub.metadata.schema_classes import DomainsClass, GlobalTagsClass -class DatasetTagDomainMapperConfig(ConfigModel): - domain_mapping: KeyValuePattern = KeyValuePattern.all() +class DatasetTagDomainMapperConfig(TransformerSemanticsConfigModel): + domain_mapping: Dict[str, str] class DatasetTagDomainMapper(DatasetDomainTransformer): @@ -30,46 +33,38 @@ def create( def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: - # Initialize or extend the existing domain aspect + # Initialize the existing domain aspect existing_domain_aspect: DomainsClass = cast(DomainsClass, aspect) - domain_aspect = DomainsClass( - domains=existing_domain_aspect.domains if existing_domain_aspect else [] - ) - - domain_mapping = self.config.domain_mapping assert self.ctx.graph global_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(entity_urn) # Check if we have tags received in existing aspect if global_tags: - transformer_tags = domain_mapping.rules.keys() + domain_mapping = self.config.domain_mapping + transformer_tags = domain_mapping.keys() tags_seen: Set[str] = set() for tag_item in global_tags.tags: tag = tag_item.tag.split("urn:li:tag:")[-1] if tag in transformer_tags: tags_seen.add(tag) + if tags_seen: - domains_to_add = DatasetTagDomainMapper.get_matching_domains( - domain_mapping, tags_seen - ) + domain_aspect = DomainsClass(domains=[]) + domains_to_add: List[str] = [] + for tag in tags_seen: + if domain_mapping.get(tag): + domains_to_add.append(domain_mapping[tag]) + mapped_domains = AddDatasetDomain.get_domain_class( self.ctx.graph, domains_to_add ) domain_aspect.domains.extend(mapped_domains.domains) - # Try merging with server-side domains - patch_domain_aspect: Optional[ - DomainsClass - ] = AddDatasetDomain._merge_with_server_domains( - self.ctx.graph, entity_urn, domain_aspect - ) - return cast(Optional[Aspect], patch_domain_aspect) - - return cast(Optional[Aspect], domain_aspect) - - @staticmethod - def get_matching_domains( - mapping: KeyValuePattern, tags_seen: Set[str] - ) -> List[str]: - domains_to_add = [] - for tag in tags_seen: - domains_to_add.extend(mapping.value(tag)) - return domains_to_add + if self.config.semantics == TransformerSemantics.PATCH: + # Try merging with server-side domains + patch_domain_aspect: Optional[ + DomainsClass + ] = AddDatasetDomain._merge_with_server_domains( + self.ctx.graph, entity_urn, domain_aspect + ) + return cast(Optional[Aspect], patch_domain_aspect) + return cast(Optional[Aspect], domain_aspect) + return cast(Optional[Aspect], existing_domain_aspect) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 82e93f750f1d9..a0deae972badb 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -3463,28 +3463,25 @@ def test_pattern_cleanup_usage_statistics_user_3( assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts -def test_domain_mapping_based_on_tags(mock_datahub_graph): +def test_domain_mapping_based_on_tags_with_valid_tags(mock_datahub_graph): acryl_domain = builder.make_domain_urn("acryl.io") - gslab_domain = builder.make_domain_urn("gslab.io") server_domain = builder.make_domain_urn("test.io") - pipeline_context = PipelineContext(run_id="transformer_pipe_line") - pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + tag_one = builder.make_tag_urn("test:tag_1") # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: - return models.GlobalTagsClass( - tags=[TagAssociationClass(tag=builder.make_tag_urn("Test"))] - ) + return models.GlobalTagsClass(tags=[TagAssociationClass(tag=tag_one)]) + + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) pipeline_context.graph.get_tags = fake_get_tags # type: ignore output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, - aspect=models.DomainsClass(domains=[gslab_domain]), - config={ - "domain_mapping": {"rules": {"Test": [acryl_domain, server_domain]}}, - }, + aspect=models.DomainsClass(domains=[server_domain]), + config={"domain_mapping": {"test:tag_1": acryl_domain}}, pipeline_context=pipeline_context, ) @@ -3495,10 +3492,9 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: assert output[0].record.aspect is not None assert isinstance(output[0].record.aspect, models.DomainsClass) transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) - assert len(transformed_aspect.domains) == 3 - assert gslab_domain in transformed_aspect.domains + assert len(transformed_aspect.domains) == 1 assert acryl_domain in transformed_aspect.domains - assert server_domain in transformed_aspect.domains + assert server_domain not in transformed_aspect.domains def test_domain_mapping_based_on_tags_with_no_matching_tags(mock_datahub_graph): @@ -3509,7 +3505,7 @@ def test_domain_mapping_based_on_tags_with_no_matching_tags(mock_datahub_graph): pipeline_context = PipelineContext(run_id="no_match_pipeline") pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) - # Return tags that do not match any rule + # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass(tags=[TagAssociationClass(tag=non_matching_tag)]) @@ -3519,7 +3515,7 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: transformer_type=DatasetTagDomainMapper, aspect=models.DomainsClass(domains=[server_domain]), config={ - "domain_mapping": {"rules": {"Test": [acryl_domain]}}, + "domain_mapping": {"test:tag_1": acryl_domain}, }, pipeline_context=pipeline_context, ) @@ -3538,6 +3534,7 @@ def test_domain_mapping_based_on_tags_with_empty_config(mock_datahub_graph): pipeline_context = PipelineContext(run_id="empty_config_pipeline") pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass(tags=[TagAssociationClass(tag=some_tag)]) @@ -3546,7 +3543,7 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, aspect=models.DomainsClass(domains=[]), - config={"domain_mapping": {"rules": {}}}, + config={"domain_mapping": {}}, pipeline_context=pipeline_context, ) assert len(output) == 2 @@ -3554,34 +3551,36 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: assert len(output[0].record.aspect.domains) == 0 -def test_domain_mapping_based_on_tags_with_multiple_matches(mock_datahub_graph): +def test_domain_mapping_based__r_on_tags_with_multiple_tags(mock_datahub_graph): # Two tags that match different rules in the domain mapping configuration tag_one = builder.make_tag_urn("test:tag_1") tag_two = builder.make_tag_urn("test:tag_2") - + existing_domain = builder.make_domain_urn("existing.io") finance = builder.make_domain_urn("finance") hr = builder.make_domain_urn("hr") - marketing = builder.make_domain_urn("marketing") - sales = builder.make_domain_urn("sales") pipeline_context = PipelineContext(run_id="multiple_matches_pipeline") pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) - # Return fake aspect to simulate server behavior with multiple matching tags + # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass( tags=[TagAssociationClass(tag=tag_one), TagAssociationClass(tag=tag_two)] ) + # Return fake aspect to simulate server behaviour + def fake_get_domain(entity_urn: str) -> models.DomainsClass: + return models.DomainsClass(domains=[existing_domain]) + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + pipeline_context.graph.get_domain = fake_get_domain # type: ignore output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, - aspect=models.DomainsClass(domains=[]), + aspect=models.DomainsClass(domains=[existing_domain]), config={ - "domain_mapping": { - "rules": {"test:tag_1": [finance, hr], "test:tag_2": [marketing, sales]} - }, + "domain_mapping": {"test:tag_1": finance, "test:tag_2": hr}, + "semantics": "PATCH", }, pipeline_context=pipeline_context, ) @@ -3594,11 +3593,8 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) # Expecting domains from both matched tags - expected_domains = set([finance, hr, marketing, sales]) - assert set(transformed_aspect.domains) == expected_domains - assert ( - len(transformed_aspect.domains) == 4 - ) # Ensure all expected domains are present and no duplicates + assert set(output[0].record.aspect.domains) == {existing_domain, finance, hr} + assert len(transformed_aspect.domains) == 3 def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph): @@ -3607,6 +3603,7 @@ def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph): pipeline_context = PipelineContext(run_id="empty_config_pipeline") pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: return models.GlobalTagsClass(tags=[]) @@ -3615,9 +3612,7 @@ def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, aspect=models.DomainsClass(domains=[acryl_domain]), - config={ - "domain_mapping": {"rules": {"Test": [server_domain]}}, - }, + config={"domain_mapping": {"test:tag_1": server_domain}}, pipeline_context=pipeline_context, ) @@ -3636,6 +3631,7 @@ def test_domain_mapping_based_on_tags_with_no_tags(mock_datahub_graph): pipeline_context = PipelineContext(run_id="empty_config_pipeline") pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + # Return fake aspect to simulate server behaviour def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]: return None @@ -3644,9 +3640,7 @@ def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]: output = run_dataset_transformer_pipeline( transformer_type=DatasetTagDomainMapper, aspect=models.DomainsClass(domains=[acryl_domain]), - config={ - "domain_mapping": {"rules": {"Test": [server_domain]}}, - }, + config={"domain_mapping": {"test:tag_1": server_domain}}, pipeline_context=pipeline_context, )