Skip to content

Commit

Permalink
refactor(ingest): use aspect map in transformers (#6040)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Sep 26, 2022
1 parent 036f720 commit 611c053
Showing 1 changed file with 2 additions and 49 deletions.
Expand Up @@ -3,66 +3,23 @@
from typing import Any, Dict, Iterable, List, Optional, Type, Union

import datahub.emitter.mce_builder
from datahub.emitter.aspect import ASPECT_MAP
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import ControlRecord, EndOfStream, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.metadata.schema_classes import (
BrowsePathsClass,
ChangeTypeClass,
DataFlowSnapshotClass,
DataJobSnapshotClass,
DataPlatformInstanceClass,
DatasetDeprecationClass,
DatasetPropertiesClass,
DatasetSnapshotClass,
DatasetUpstreamLineageClass,
DomainsClass,
EditableDatasetPropertiesClass,
EditableSchemaMetadataClass,
GlobalTagsClass,
GlossaryTermsClass,
InstitutionalMemoryClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
OwnershipClass,
SchemaMetadataClass,
StatusClass,
UpstreamLineageClass,
ViewPropertiesClass,
)
from datahub.utilities.urns.urn import Urn

log = logging.getLogger(__name__)


class SnapshotAspectRegistry:
"""A registry of aspect name to aspect type mappings, only for snapshot classes. Do not add non-snapshot aspect classes here."""

def __init__(self):
self.aspect_name_type_mapping = {
"ownership": OwnershipClass,
"domains": DomainsClass,
"globalTags": GlobalTagsClass,
"datasetProperties": DatasetPropertiesClass,
"editableDatasetProperties": EditableDatasetPropertiesClass,
"glossaryTerms": GlossaryTermsClass,
"status": StatusClass,
"browsePaths": BrowsePathsClass,
"schemaMetadata": SchemaMetadataClass,
"editableSchemaMetadata": EditableSchemaMetadataClass,
"datasetDeprecation": DatasetDeprecationClass,
"datasetUpstreamLineage": DatasetUpstreamLineageClass,
"upstreamLineage": UpstreamLineageClass,
"institutionalMemory": InstitutionalMemoryClass,
"dataPlatformInstance": DataPlatformInstanceClass,
"viewProperties": ViewPropertiesClass,
}

def get_aspect_type(self, aspect_name: str) -> Optional[Type[Aspect]]:
return self.aspect_name_type_mapping.get(aspect_name)


class LegacyMCETransformer(Transformer, metaclass=ABCMeta):
@abstractmethod
def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
Expand Down Expand Up @@ -103,7 +60,6 @@ def __init__(self):
"dataFlow": DataFlowSnapshotClass,
"dataJob": DataJobSnapshotClass,
}
self.snapshot_aspect_registry = SnapshotAspectRegistry()
mixedin = False
for mixin in [LegacyMCETransformer, SingleAspectTransformer]:
mixedin = mixedin or isinstance(self, mixin)
Expand Down Expand Up @@ -168,9 +124,7 @@ def _transform_or_record_mce(
if mce.proposedSnapshot:
self._record_mce(mce)
if isinstance(self, SingleAspectTransformer):
aspect_type = self.snapshot_aspect_registry.get_aspect_type( # type: ignore
self.aspect_name()
)
aspect_type = ASPECT_MAP.get(self.aspect_name())
if aspect_type:
# if we find a type corresponding to the aspect name we look for it in the mce
old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(
Expand Down Expand Up @@ -279,7 +233,6 @@ def transform(
record=MetadataChangeProposalWrapper(
entityUrn=urn,
entityType=structured_urn.get_type(),
changeType=ChangeTypeClass.UPSERT,
systemMetadata=last_seen_mcp.systemMetadata
if last_seen_mcp
else last_seen_mce_system_metadata,
Expand Down

0 comments on commit 611c053

Please sign in to comment.