Skip to content

Commit

Permalink
refactor(ingest): subtypes - standardize (#7437)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Feb 28, 2023
1 parent 28b973e commit 17e8597
Show file tree
Hide file tree
Showing 61 changed files with 346 additions and 283 deletions.
11 changes: 7 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Expand Up @@ -52,6 +52,10 @@
from datahub.ingestion.source.aws import s3_util
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.aws.s3_util import is_s3_uri, make_s3_urn
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
)
from datahub.ingestion.source.glue_profiling_config import GlueProfilingConfig
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.sql_common_state import (
Expand Down Expand Up @@ -881,7 +885,7 @@ def gen_database_containers(
container_workunits = gen_containers(
container_key=database_container_key,
name=database["Name"],
sub_types=["Database"],
sub_types=[DatasetContainerSubTypes.DATABASE],
domain_urn=domain_urn,
description=database.get("Description"),
qualified_name=self.get_glue_arn(
Expand Down Expand Up @@ -965,7 +969,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# possible via Dataset snapshot embedded in a mce, so we have to generate a mcp.
workunit = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SubTypes(typeNames=["table"]),
aspect=SubTypes(typeNames=[DatasetSubTypes.TABLE]),
).as_workunit()
self.report.report_workunit(workunit)
yield workunit
Expand Down Expand Up @@ -1147,9 +1151,8 @@ def get_s3_tags() -> Optional[GlobalTagsClass]:
logger.warning(
"Could not connect to DatahubApi. No current tags to maintain"
)

# Remove duplicate tags
tags_to_add = list(set(tags_to_add))
tags_to_add = sorted(list(set(tags_to_add)))
new_tags = GlobalTagsClass(
tags=[TagAssociationClass(tag_to_add) for tag_to_add in tags_to_add]
)
Expand Down
Expand Up @@ -67,7 +67,7 @@ def get_s3_tags(
else:
logger.warn("Could not connect to DatahubApi. No current tags to maintain")
# Remove duplicate tags
tags_to_add = list(set(tags_to_add))
tags_to_add = sorted(list(set(tags_to_add)))
new_tags = GlobalTagsClass(
tags=[TagAssociationClass(tag_to_add) for tag_to_add in tags_to_add]
)
Expand Down
Expand Up @@ -60,6 +60,10 @@
)
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
)
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
gen_database_container,
Expand Down Expand Up @@ -455,7 +459,7 @@ def gen_project_id_containers(self, database: str) -> Iterable[MetadataWorkUnit]
yield from gen_database_container(
database=database,
name=database,
sub_types=["Project"],
sub_types=[DatasetContainerSubTypes.BIGQUERY_PROJECT],
domain_registry=self.domain_registry,
domain_config=self.config.domain,
report=self.report,
Expand All @@ -472,7 +476,7 @@ def gen_dataset_containers(
yield from gen_schema_container(
database=project_id,
schema=dataset,
sub_types=["Dataset"],
sub_types=[DatasetContainerSubTypes.BIGQUERY_DATASET],
domain_registry=self.domain_registry,
domain_config=self.config.domain,
report=self.report,
Expand Down Expand Up @@ -911,11 +915,11 @@ def gen_table_dataset_workunits(
custom_properties["max_partition_id"] = str(table.max_partition_id)
custom_properties["is_partitioned"] = str(True)

sub_types: List[str] = ["table"]
sub_types: List[str] = [DatasetSubTypes.TABLE]
if table.max_shard_id:
custom_properties["max_shard_id"] = str(table.max_shard_id)
custom_properties["is_sharded"] = str(True)
sub_types = ["sharded table", "table"]
sub_types = ["sharded table"] + sub_types

tags_to_add = None
if table.labels and self.config.capture_table_label_as_tag:
Expand Down Expand Up @@ -946,7 +950,7 @@ def gen_view_dataset_workunits(
columns=columns,
project_id=project_id,
dataset_name=dataset_name,
sub_types=["view"],
sub_types=[DatasetSubTypes.VIEW],
)

view = cast(BigqueryView, table)
Expand Down
Empty file.
40 changes: 40 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py
@@ -0,0 +1,40 @@
from enum import Enum


class DatasetSubTypes(str, Enum):
# Generic SubTypes
TABLE = "Table"
VIEW = "View"
TOPIC = "Topic"
SCHEMA = "Schema"
# System-Specific SubTypes
LOOKER_EXPLORE = "Explore"
ELASTIC_INDEX_TEMPLATE = "Index Template"
ELASTIC_INDEX = "Index"
ELASTIC_DATASTREAM = "Datastream"
SALESFORCE_CUSTOM_OBJECT = "Custom Object"
SALESFORCE_STANDARD_OBJECT = "Object"


class DatasetContainerSubTypes(str, Enum):
# Generic SubTypes
DATABASE = "Database"
SCHEMA = "Schema"
# System-Specific SubTypes
PRESTO_CATALOG = "Catalog"
BIGQUERY_PROJECT = "Project"
BIGQUERY_DATASET = "Dataset"
DATABRICKS_METASTORE = "Metastore"
S3_FOLDER = "Folder"
S3_BUCKET = "S3 bucket"


class BIContainerSubTypes(str, Enum):
LOOKER_FOLDER = "Folder"
TABLEAU_WORKBOOK = "Workbook"
POWERBI_WORKSPACE = "Workspace"


class BIAssetSubTypes(str, Enum):
# Generic SubTypes
REPORT = "Report"
Expand Up @@ -29,6 +29,7 @@
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
SchemaField,
Expand Down Expand Up @@ -407,11 +408,11 @@ def _extract_mcps(
entityUrn=dataset_urn,
aspect=SubTypesClass(
typeNames=[
"Index Template"
DatasetSubTypes.ELASTIC_INDEX_TEMPLATE
if not is_index
else "Index"
else DatasetSubTypes.ELASTIC_INDEX
if not data_stream
else "Datastream"
else DatasetSubTypes.ELASTIC_DATASTREAM
]
),
)
Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/kafka.py
Expand Up @@ -37,6 +37,7 @@
from datahub.ingestion.api.registry import import_path
from datahub.ingestion.api.source import SourceCapability
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.kafka_schema_registry_base import KafkaSchemaRegistryBase
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -281,7 +282,7 @@ def _extract_record(
id=f"{topic}-subtype",
mcp=MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SubTypesClass(typeNames=["topic"]),
aspect=SubTypesClass(typeNames=[DatasetSubTypes.TOPIC]),
),
)
self.report.report_workunit(subtype_wu)
Expand Down
Expand Up @@ -34,6 +34,7 @@
from datahub.emitter.mcp_builder import create_embed_mcp
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI
from datahub.ingestion.source.sql.sql_types import (
POSTGRES_TYPES_MAP,
Expand Down Expand Up @@ -894,7 +895,7 @@ def _to_metadata_events( # noqa: C901
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_snapshot.urn,
aspectName="subTypes",
aspect=SubTypesClass(typeNames=["explore"]),
aspect=SubTypesClass(typeNames=[DatasetSubTypes.LOOKER_EXPLORE]),
)

proposals: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = [
Expand Down
Expand Up @@ -586,7 +586,7 @@ def _get_looker_dashboard_element( # noqa: C901
)
)

explores = list(set(explores)) # dedup the list of views
explores = sorted(list(set(explores))) # dedup the list of views

return LookerDashboardElement(
id=element.id,
Expand Down
Expand Up @@ -33,6 +33,7 @@
)
from datahub.ingestion.api.registry import import_path
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.git.git_import import GitClone
from datahub.ingestion.source.looker.looker_common import (
LookerCommonConfig,
Expand Down Expand Up @@ -1357,7 +1358,7 @@ def _build_dataset_mcps(
changeType=ChangeTypeClass.UPSERT,
entityUrn=looker_view.id.get_urn(self.source_config),
aspectName="subTypes",
aspect=SubTypesClass(typeNames=["view"]),
aspect=SubTypesClass(typeNames=[DatasetSubTypes.VIEW]),
)
events = [subTypeEvent]
if looker_view.view_details is not None:
Expand Down
Expand Up @@ -9,6 +9,7 @@
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import DEFAULT_ENV
from datahub.ingestion.source.common.subtypes import BIAssetSubTypes
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
Expand Down Expand Up @@ -97,7 +98,7 @@ class Constant:
EXPRESSION = "expression"
SOURCE = "source"
PLATFORM_NAME = "powerbi"
REPORT_TYPE_NAME = "Report"
REPORT_TYPE_NAME = BIAssetSubTypes.REPORT
CHART_COUNT = "chartCount"
WORKSPACE_NAME = "workspaceName"
DATASET_WEB_URL = "datasetWebUrl"
Expand Down
Expand Up @@ -23,6 +23,7 @@
)
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import BIContainerSubTypes
from datahub.ingestion.source.powerbi.config import (
Constant,
PlatformDetail,
Expand Down Expand Up @@ -525,7 +526,7 @@ def generate_container_for_workspace(
container_work_units = gen_containers(
container_key=workspace_key,
name=workspace.name,
sub_types=["Workspace"],
sub_types=[BIContainerSubTypes.POWERBI_WORKSPACE],
)
return container_work_units

Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Expand Up @@ -27,6 +27,7 @@
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
Expand Down Expand Up @@ -484,7 +485,7 @@ def _extract_record(
# 6. Emit subtype aspect marking this as a "topic"
subtype_wu = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SubTypesClass(typeNames=["topic"]),
aspect=SubTypesClass(typeNames=[DatasetSubTypes.TOPIC]),
).as_workunit()
self.report.report_workunit(subtype_wu)
yield subtype_wu
Expand Down
Expand Up @@ -15,6 +15,7 @@
get_bucket_relative_path,
is_s3_uri,
)
from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes

# hide annoying debug errors from py4j
logging.getLogger("py4j").setLevel(logging.ERROR)
Expand Down Expand Up @@ -79,7 +80,7 @@ def create_container_hierarchy(
yield from self.create_emit_containers(
container_key=bucket_key,
name=bucket_name,
sub_types=["S3 bucket"],
sub_types=[DatasetContainerSubTypes.S3_BUCKET],
parent_container_key=None,
)
parent_key = bucket_key
Expand Down Expand Up @@ -111,7 +112,7 @@ def create_container_hierarchy(
yield from self.create_emit_containers(
container_key=folder_key,
name=folder,
sub_types=["Folder"],
sub_types=[DatasetContainerSubTypes.S3_FOLDER],
parent_container_key=parent_key,
)
parent_key = folder_key
Expand Down
7 changes: 4 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/salesforce.py
Expand Up @@ -28,6 +28,7 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.metadata.schema_classes import (
AuditStampClass,
BooleanTypeClass,
Expand Down Expand Up @@ -448,11 +449,11 @@ def get_properties_workunit(
).as_workunit()

def get_subtypes_workunit(self, sObjectName: str, datasetUrn: str) -> WorkUnit:
subtypes = []
subtypes: List[str] = []
if sObjectName.endswith("__c"):
subtypes.append("Custom Object")
subtypes.append(DatasetSubTypes.SALESFORCE_CUSTOM_OBJECT)
else:
subtypes.append("Standard Object")
subtypes.append(DatasetSubTypes.SALESFORCE_STANDARD_OBJECT)

return MetadataChangeProposalWrapper(
entityUrn=datasetUrn, aspect=SubTypesClass(typeNames=subtypes)
Expand Down
Expand Up @@ -36,6 +36,7 @@
JsonSchemaTranslator,
get_schema_metadata,
)
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
Expand Down Expand Up @@ -314,7 +315,7 @@ def _load_one_file(
yield self._report_and_return(
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=models.SubTypesClass(typeNames=["schema"]),
aspect=models.SubTypesClass(typeNames=[DatasetSubTypes.SCHEMA]),
).as_workunit()
)

Expand Down
Expand Up @@ -36,6 +36,10 @@
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.glossary.classification_mixin import ClassificationMixin
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
)
from datahub.ingestion.source.snowflake.constants import (
GENERIC_PERMISSION_ERROR_KEY,
SNOWFLAKE_DATABASE,
Expand Down Expand Up @@ -73,7 +77,6 @@
SnowflakePermissionError,
SnowflakeQueryMixin,
)
from datahub.ingestion.source.sql.sql_common import SqlContainerSubTypes
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
gen_database_container,
Expand Down Expand Up @@ -1013,7 +1016,9 @@ def gen_dataset_workunits(
yield dpi_aspect

subTypes = SubTypes(
typeNames=["view"] if isinstance(table, SnowflakeView) else ["table"]
typeNames=[DatasetSubTypes.VIEW]
if isinstance(table, SnowflakeView)
else [DatasetSubTypes.TABLE]
)

yield MetadataChangeProposalWrapper(
Expand Down Expand Up @@ -1222,7 +1227,7 @@ def gen_database_containers(
name=database.name,
database=self.snowflake_identifier(database.name),
database_container_key=database_container_key,
sub_types=[SqlContainerSubTypes.DATABASE],
sub_types=[DatasetContainerSubTypes.DATABASE],
domain_registry=self.domain_registry,
domain_config=self.config.domain,
report=self.report,
Expand Down Expand Up @@ -1269,7 +1274,7 @@ def gen_schema_containers(
database_container_key=database_container_key,
domain_config=self.config.domain,
schema_container_key=schema_container_key,
sub_types=[SqlContainerSubTypes.SCHEMA],
sub_types=[DatasetContainerSubTypes.SCHEMA],
report=self.report,
domain_registry=self.domain_registry,
description=schema.comment,
Expand Down

0 comments on commit 17e8597

Please sign in to comment.