Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logging statements for transformer matches #10407

Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ def populate_terms_in_schema_metadata(
) -> None:
for schema_field in schema_metadata.fields:
if schema_field.fieldPath in field_terms:
logger.info(
f"Attaching glossary term '{field_terms[schema_field.fieldPath]}' "
f"to field '{schema_field.fieldPath}'"
)
schema_field.glossaryTerms = GlossaryTerms(
terms=[
GlossaryTermAssociation(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import Callable, Dict, List, Optional, cast

import datahub.emitter.mce_builder as builder
Expand All @@ -14,11 +15,14 @@
from datahub.metadata.schema_classes import (
AuditStampClass,
GlossaryTermAssociationClass,
GlossaryTermInfoClass,
GlossaryTermsClass,
SchemaFieldClass,
SchemaMetadataClass,
)

logger: logging.Logger = logging.getLogger(__name__)


class AddDatasetSchemaTermsConfig(TransformerSemanticsConfigModel):
get_terms_to_add: Callable[[str], List[GlossaryTermAssociationClass]]
Expand All @@ -43,7 +47,10 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetSchemaTer
return cls(config, ctx)

def extend_field(
self, schema_field: SchemaFieldClass, server_field: Optional[SchemaFieldClass]
self,
schema_field: SchemaFieldClass,
server_field: Optional[SchemaFieldClass],
entity_urn: str,
) -> SchemaFieldClass:
all_terms = self.config.get_terms_to_add(schema_field.fieldPath)
if len(all_terms) == 0:
Expand Down Expand Up @@ -76,9 +83,29 @@ def extend_field(
new_glossary_terms.extend(terms_to_add)

unique_gloseary_terms = []
term_log_entries = []
for term in new_glossary_terms:
if term not in unique_gloseary_terms:
unique_gloseary_terms.append(term)
if self.ctx.graph:
term_aspect = self.ctx.graph.get_aspect(
entity_urn=term.urn, aspect_type=GlossaryTermInfoClass
)
if term_aspect:
term_name = term_aspect.name
term_log_entries.append(
f"Term URN: {term.urn}, Term Name: {term_name}, Field Path: {schema_field.fieldPath}, Entity URN: {entity_urn}"
)
else:
term_log_entries.append(
f"Term URN: {term.urn}, Term Name: Not Found, Field Path: {schema_field.fieldPath}, Entity URN: {entity_urn}"
)

# Log the collected term details together
if term_log_entries:
logger.debug("Adding below terms to fields:")
for entry in term_log_entries:
logger.debug(entry)

new_glossary_term = GlossaryTermsClass(
terms=[],
Expand Down Expand Up @@ -130,7 +157,11 @@ def transform_aspect(
return None

schema_metadata_aspect.fields = [
self.extend_field(field, server_field=server_field_map.get(field.fieldPath))
self.extend_field(
field,
server_field=server_field_map.get(field.fieldPath),
entity_urn=entity_urn,
)
for field in schema_metadata_aspect.fields
]

Expand Down
Loading