Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest/graph): Factor out filter logic #8888

Merged
merged 3 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
from datahub.cli import cli_utils
from datahub.configuration.datetimes import ClickDatetime
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.ingestion.graph.client import (
DataHubGraph,
RemovedStatusFilter,
get_default_graph,
)
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.ingestion.graph.filters import RemovedStatusFilter
from datahub.telemetry import telemetry
from datahub.upgrade import upgrade
from datahub.utilities.perf_timer import PerfTimer
Expand Down
171 changes: 8 additions & 163 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
from datahub.cli.cli_utils import get_url_and_token
from datahub.configuration.common import ConfigModel, GraphError, OperationalError
from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
from datahub.emitter.mce_builder import (
DEFAULT_ENV,
Aspect,
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.emitter.mce_builder import DEFAULT_ENV, Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.emitter.serialization_helper import post_json_transform
from datahub.ingestion.graph.filters import (
RemovedStatusFilter,
SearchFilterRule,
generate_filter,
)
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.metadata.schema_classes import (
ASPECT_NAME_MAP,
Expand Down Expand Up @@ -59,8 +59,6 @@

logger = logging.getLogger(__name__)

SearchFilterRule = Dict[str, Any]


class DatahubClientConfig(ConfigModel):
"""Configuration class for holding connectivity to datahub gms"""
Expand All @@ -81,19 +79,6 @@ class DatahubClientConfig(ConfigModel):
DataHubGraphConfig = DatahubClientConfig


class RemovedStatusFilter(enum.Enum):
"""Filter for the status of entities during search."""

NOT_SOFT_DELETED = "NOT_SOFT_DELETED"
"""Search only entities that have not been marked as deleted."""

ALL = "ALL"
"""Search all entities, including deleted entities."""

ONLY_SOFT_DELETED = "ONLY_SOFT_DELETED"
"""Search only soft-deleted entities."""


@dataclass
class RelatedEntity:
urn: str
Expand Down Expand Up @@ -567,7 +552,7 @@ def _bulk_fetch_schema_info_by_filter(
# Add the query default of * if no query is specified.
query = query or "*"

orFilters = self.generate_filter(
orFilters = generate_filter(
platform, platform_instance, env, container, status, extraFilters
)

Expand Down Expand Up @@ -621,54 +606,6 @@ def _bulk_fetch_schema_info_by_filter(
if entity.get("schemaMetadata"):
yield entity["urn"], entity["schemaMetadata"]

def generate_filter(
self,
platform: Optional[str],
platform_instance: Optional[str],
env: Optional[str],
container: Optional[str],
status: RemovedStatusFilter,
extraFilters: Optional[List[SearchFilterRule]],
) -> List[Dict[str, List[SearchFilterRule]]]:
andFilters: List[SearchFilterRule] = []

# Platform filter.
if platform:
andFilters.append(self._get_platform_filter(platform))

# Platform instance filter.
if platform_instance:
andFilters.append(
self._get_platform_instance_filter(platform, platform_instance)
)

# Browse path v2 filter.
if container:
andFilters.append(self._get_container_filter(container))

# Status filter.
status_filter = self._get_status_filer(status)
if status_filter:
andFilters.append(status_filter)

# Extra filters.
if extraFilters:
andFilters += extraFilters

orFilters: List[Dict[str, List[SearchFilterRule]]] = [{"and": andFilters}]

# Env filter
if env:
envOrConditions = self._get_env_or_conditions(env)
# This matches ALL of the andFilters and at least one of the envOrConditions.
orFilters = [
{"and": andFilters["and"] + [extraCondition]}
for extraCondition in envOrConditions
for andFilters in orFilters
]

return orFilters

def get_urns_by_filter(
self,
*,
Expand Down Expand Up @@ -709,7 +646,7 @@ def get_urns_by_filter(
query = query or "*"

# Env filter.
orFilters = self.generate_filter(
orFilters = generate_filter(
platform, platform_instance, env, container, status, extraFilters
)

Expand Down Expand Up @@ -778,98 +715,6 @@ def _scroll_across_entities(
f"Scrolling to next scrollAcrossEntities page: {scroll_id}"
)

def _get_env_or_conditions(self, env: str) -> List[SearchFilterRule]:
# The env filter is a bit more tricky since it's not always stored
# in the same place in ElasticSearch.
return [
# For most entity types, we look at the origin field.
{
"field": "origin",
"value": env,
"condition": "EQUAL",
},
# For containers, we look at the customProperties field.
# For any containers created after https://github.com/datahub-project/datahub/pull/8027,
# we look for the "env" property. Otherwise, we use the "instance" property.
{
"field": "customProperties",
"value": f"env={env}",
},
{
"field": "customProperties",
"value": f"instance={env}",
},
# Note that not all entity types have an env (e.g. dashboards / charts).
# If the env filter is specified, these will be excluded.
]

def _get_status_filer(
self, status: RemovedStatusFilter
) -> Optional[SearchFilterRule]:
if status == RemovedStatusFilter.NOT_SOFT_DELETED:
# Subtle: in some cases (e.g. when the dataset doesn't have a status aspect), the
# removed field is simply not present in the ElasticSearch document. Ideally this
# would be a "removed" : "false" filter, but that doesn't work. Instead, we need to
# use a negated filter.
return {
"field": "removed",
"values": ["true"],
"condition": "EQUAL",
"negated": True,
}

elif status == RemovedStatusFilter.ONLY_SOFT_DELETED:
return {
"field": "removed",
"values": ["true"],
"condition": "EQUAL",
}

elif status == RemovedStatusFilter.ALL:
# We don't need to add a filter for this case.
return None
else:
raise ValueError(f"Invalid status filter: {status}")

def _get_container_filter(self, container: str) -> SearchFilterRule:
# Warn if container is not a fully qualified urn.
# TODO: Change this once we have a first-class container urn type.
if guess_entity_type(container) != "container":
raise ValueError(f"Invalid container urn: {container}")

return {
"field": "browsePathV2",
"values": [container],
"condition": "CONTAIN",
}

def _get_platform_instance_filter(
self, platform: Optional[str], platform_instance: str
) -> SearchFilterRule:
if platform:
# Massage the platform instance into a fully qualified urn, if necessary.
platform_instance = make_dataplatform_instance_urn(
platform, platform_instance
)

# Warn if platform_instance is not a fully qualified urn.
# TODO: Change this once we have a first-class data platform instance urn type.
if guess_entity_type(platform_instance) != "dataPlatformInstance":
raise ValueError(f"Invalid data platform instance urn: {platform_instance}")

return {
"field": "platformInstance",
"values": [platform_instance],
"condition": "EQUAL",
}

def _get_platform_filter(self, platform: str) -> SearchFilterRule:
return {
"field": "platform.keyword",
"values": [make_data_platform_urn(platform)],
"condition": "EQUAL",
}

def _get_types(self, entity_types: Optional[List[str]]) -> Optional[List[str]]:
types: Optional[List[str]] = None
if entity_types is not None:
Expand Down
162 changes: 162 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import enum
from typing import Any, Dict, List, Optional

from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.utilities.urns.urn import guess_entity_type

SearchFilterRule = Dict[str, Any]


class RemovedStatusFilter(enum.Enum):
"""Filter for the status of entities during search."""

NOT_SOFT_DELETED = "NOT_SOFT_DELETED"
"""Search only entities that have not been marked as deleted."""

ALL = "ALL"
"""Search all entities, including deleted entities."""

ONLY_SOFT_DELETED = "ONLY_SOFT_DELETED"
"""Search only soft-deleted entities."""


def generate_filter(
platform: Optional[str],
platform_instance: Optional[str],
env: Optional[str],
container: Optional[str],
status: RemovedStatusFilter,
extra_filters: Optional[List[SearchFilterRule]],
) -> List[Dict[str, List[SearchFilterRule]]]:
and_filters: List[SearchFilterRule] = []

# Platform filter.
if platform:
and_filters.append(_get_platform_filter(platform))

# Platform instance filter.
if platform_instance:
and_filters.append(_get_platform_instance_filter(platform, platform_instance))

# Browse path v2 filter.
if container:
and_filters.append(_get_container_filter(container))

# Status filter.
status_filter = _get_status_filter(status)
if status_filter:
and_filters.append(status_filter)

# Extra filters.
if extra_filters:
and_filters += extra_filters

or_filters: List[Dict[str, List[SearchFilterRule]]] = [{"and": and_filters}]

# Env filter
if env:
env_filters = _get_env_filters(env)
# This matches ALL the and_filters and at least one of the envOrConditions.
or_filters = [
{"and": and_filter["and"] + [extraCondition]}
for extraCondition in env_filters
for and_filter in or_filters
]

return or_filters


def _get_env_filters(env: str) -> List[SearchFilterRule]:
# The env filter is a bit more tricky since it's not always stored
# in the same place in ElasticSearch.
return [
# For most entity types, we look at the origin field.
{
"field": "origin",
"value": env,
"condition": "EQUAL",
},
# For containers, we look at the customProperties field.
# For any containers created after https://github.com/datahub-project/datahub/pull/8027,
# we look for the "env" property. Otherwise, we use the "instance" property.
{
"field": "customProperties",
"value": f"env={env}",
},
{
"field": "customProperties",
"value": f"instance={env}",
},
# Note that not all entity types have an env (e.g. dashboards / charts).
# If the env filter is specified, these will be excluded.
]


def _get_status_filter(status: RemovedStatusFilter) -> Optional[SearchFilterRule]:
if status == RemovedStatusFilter.NOT_SOFT_DELETED:
# Subtle: in some cases (e.g. when the dataset doesn't have a status aspect), the
# removed field is simply not present in the ElasticSearch document. Ideally this
# would be a "removed" : "false" filter, but that doesn't work. Instead, we need to
# use a negated filter.
return {
"field": "removed",
"values": ["true"],
"condition": "EQUAL",
"negated": True,
}

elif status == RemovedStatusFilter.ONLY_SOFT_DELETED:
return {
"field": "removed",
"values": ["true"],
"condition": "EQUAL",
}

elif status == RemovedStatusFilter.ALL:
# We don't need to add a filter for this case.
return None
else:
raise ValueError(f"Invalid status filter: {status}")


def _get_container_filter(container: str) -> SearchFilterRule:
# Warn if container is not a fully qualified urn.
# TODO: Change this once we have a first-class container urn type.
if guess_entity_type(container) != "container":
raise ValueError(f"Invalid container urn: {container}")

return {
"field": "browsePathV2",
"values": [container],
"condition": "CONTAIN",
}


def _get_platform_instance_filter(
platform: Optional[str], platform_instance: str
) -> SearchFilterRule:
if platform:
# Massage the platform instance into a fully qualified urn, if necessary.
platform_instance = make_dataplatform_instance_urn(platform, platform_instance)

# Warn if platform_instance is not a fully qualified urn.
# TODO: Change this once we have a first-class data platform instance urn type.
if guess_entity_type(platform_instance) != "dataPlatformInstance":
raise ValueError(f"Invalid data platform instance urn: {platform_instance}")

return {
"field": "platformInstance",
"values": [platform_instance],
"condition": "EQUAL",
}


def _get_platform_filter(platform: str) -> SearchFilterRule:
return {
"field": "platform.keyword",
"values": [make_data_platform_urn(platform)],
"condition": "EQUAL",
}
Loading