Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/looker): ingest looks not part of dashboard #8140

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ class LookerDashboardSourceConfig(
description="Patterns for selecting chart ids that are to be included",
)
include_deleted: bool = Field(
False, description="Whether to include deleted dashboards."
False,
description="Whether to include deleted dashboards and deleted independent looks.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
description="Whether to include deleted dashboards and deleted independent looks.",
description="Whether to include deleted dashboards and looks.",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)
extract_owners: bool = Field(
True,
Expand Down Expand Up @@ -197,7 +198,7 @@ def external_url_defaults_to_api_config_base_url(
) -> Optional[str]:
return v or values.get("base_url")

@validator("extract_independent_looks", pre=True, always=True)
@validator("extract_independent_looks", always=True)
def stateful_ingestion_should_be_enabled(
cls, v: Optional[bool], *, values: Dict[str, Any], **kwargs: Dict[str, Any]
) -> Optional[bool]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,20 @@ def all_dashboards(self, fields: Union[str, List[str]]) -> Sequence[DashboardBas
transport_options=self.transport_options,
)

def all_looks(self, fields: Union[str, List[str]]) -> List[Look]:
def all_looks(
self, fields: Union[str, List[str]], soft_deleted: bool
) -> List[Look]:
self.client_stats.all_looks_calls += 1
looks: List[Look] = list(
self.client.all_looks(
fields=self.__fields_mapper(fields),
transport_options=self.transport_options,
)
)
# Add soft deleted looks
looks.extend(self.search_looks(fields=fields, deleted=True))

if soft_deleted:
# Add soft deleted looks
looks.extend(self.search_looks(fields=fields, deleted=True))

return looks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.api.source_helpers import auto_workunit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.looker import looker_usage
from datahub.ingestion.source.looker.looker_common import (
Expand Down Expand Up @@ -126,7 +127,7 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase):
accessed_dashboards: int = 0
resolved_user_ids: int = 0
email_ids_missing: int = 0 # resolved users with missing email addresses
reachable_look_registry: List[
reachable_look_registry: Set[
str
] # Keep track of look-id which are reachable from Dashboard

Expand All @@ -141,7 +142,7 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext):
)
self.reporter._looker_explore_registry = self.explore_registry
self.reporter._looker_api = self.looker_api
self.reachable_look_registry = []
self.reachable_look_registry = set()

self.explores_to_fetch_set: Dict[Tuple[str, str], List[str]] = {}

Expand Down Expand Up @@ -574,7 +575,9 @@ def _get_chart_type(
def _make_chart_metadata_events(
self,
dashboard_element: LookerDashboardElement,
dashboard: Optional[LookerDashboard],
dashboard: Optional[
LookerDashboard
], # dashboard will be None if this is a standalone look
) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
chart_urn = builder.make_chart_urn(
self.source_config.platform_name, dashboard_element.get_urn_element_id()
Expand Down Expand Up @@ -860,7 +863,7 @@ def _get_looker_dashboard(
if element.look_id is not None:
# Keeping track of reachable element from Dashboard
# Later we need to ingest looks which are not reachable from any dashboard
self.reachable_look_registry.append(element.look_id)
self.reachable_look_registry.add(element.look_id)
looker_dashboard_element = self._get_looker_dashboard_element(element)
if looker_dashboard_element is not None:
dashboard_elements.append(looker_dashboard_element)
Expand Down Expand Up @@ -1125,21 +1128,21 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
def emit_independent_looks_mcp(
self, dashboard_element: LookerDashboardElement
) -> Iterable[MetadataWorkUnit]:
yield from [
MetadataWorkUnit(id=f"looker-{mce.proposedSnapshot.urn}", mce=mce)
if isinstance(mce, MetadataChangeEvent)
else MetadataWorkUnit(
id=f"looker-{mce.aspectName}-{mce.entityUrn}", mcp=mce
)
for mce in self._make_chart_metadata_events(
dashboard_element=dashboard_element, dashboard=None

yield from auto_workunit(
stream=self._make_chart_metadata_events(
dashboard_element=dashboard_element,
dashboard=None,
)
]
)

mcp: MetadataChangeProposalWrapper = self._make_metrics_dimensions_chart_mcp(
dashboard_element
yield from auto_workunit(
[
self._make_metrics_dimensions_chart_mcp(
dashboard_element,
)
]
)
yield MetadataWorkUnit(id=f"looker-{mcp.entityUrn}", mcp=mcp)

def extract_independent_looks(self) -> Iterable[MetadataWorkUnit]:
"""
Expand All @@ -1148,6 +1151,8 @@ def extract_independent_looks(self) -> Iterable[MetadataWorkUnit]:
if self.source_config.extract_independent_looks is False:
return

self.reporter.report_stage_start("extract_independent_looks")

logger.debug("Extracting looks not part of Dashboard")
look_fields: List[str] = [
"id",
Expand All @@ -1165,7 +1170,9 @@ def extract_independent_looks(self) -> Iterable[MetadataWorkUnit]:
"slug",
]

all_looks: List[Look] = self.looker_api.all_looks(fields=look_fields)
all_looks: List[Look] = self.looker_api.all_looks(
fields=look_fields, soft_deleted=self.source_config.include_deleted
)
for look in all_looks:
if look.id in self.reachable_look_registry:
# This look is reachable from Dashboard
Expand All @@ -1181,11 +1188,11 @@ def extract_independent_looks(self) -> Iterable[MetadataWorkUnit]:
LookerDashboardElement
] = self._get_looker_dashboard_element(
DashboardElement(
id=f"looks_{look.id}", # to avoid conflict with element.id prefixing "looks_" to look.id.
id=f"looks_{look.id}", # to avoid conflict with non-standalone looks (element.id prefixes), we add the "looks_" prefix to look.id.
title=look.title,
subtitle_text=look.description,
look_id=look.id,
dashboard_id="NOT AVAILABLE",
dashboard_id=None, # As this is independent look
look=LookWithQuery(query=query),
)
)
Expand All @@ -1196,6 +1203,8 @@ def extract_independent_looks(self) -> Iterable[MetadataWorkUnit]:
dashboard_element=dashboard_element
)

self.reporter.report_stage_end("extract_independent_looks")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation is off here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.reporter.report_stage_start("list_dashboards")
dashboards = self.looker_api.all_dashboards(fields="id")
Expand Down
45 changes: 45 additions & 0 deletions metadata-ingestion/tests/integration/looker/test_looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

from datahub.ingestion.run.pipeline import Pipeline, PipelineInitError
from datahub.ingestion.source.looker import looker_usage
from datahub.ingestion.source.looker.looker_lib_wrapper import (
LookerAPI,
LookerAPIConfig,
)
from datahub.ingestion.source.looker.looker_query_model import (
HistoryViewField,
LookViewField,
Expand Down Expand Up @@ -318,6 +322,17 @@ def setup_mock_look(mocked_client):
)


def setup_mock_soft_deleted_look(mocked_client):
mocked_client.search_looks.return_value = [
Look(
id="2",
title="Soft Deleted",
description="I am not part of any Dashboard",
query_id="1",
)
]


def setup_mock_dashboard_multiple_charts(mocked_client):
mocked_client.all_dashboards.return_value = [Dashboard(id="1")]
mocked_client.dashboard.return_value = Dashboard(
Expand Down Expand Up @@ -886,3 +901,33 @@ def test_independent_looks_ingest(
output_path=tmp_path / "looker_mces.json",
golden_path=f"{test_resources_dir}/{mce_out_file}",
)


@freeze_time(FROZEN_TIME)
def test_independent_soft_deleted_looks(
pytestconfig,
tmp_path,
mock_time,
):
mocked_client = mock.MagicMock()

with mock.patch("looker_sdk.init40") as mock_sdk:

mock_sdk.return_value = mocked_client
setup_mock_look(mocked_client)
setup_mock_soft_deleted_look(mocked_client)
looker_api = LookerAPI(
config=LookerAPIConfig(
base_url="https://fake.com",
client_id="foo",
client_secret="bar",
)
)
looks: List[Look] = looker_api.all_looks(
fields=["id"],
soft_deleted=True,
)

assert len(looks) == 2
assert looks[0].title == "Outer Look"
assert looks[1].title == "Soft Deleted"
Loading