Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): extract powerbi endorsements to tags #6638

Merged
merged 13 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions metadata-ingestion/docs/sources/powerbi/powerbi_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,9 @@ combine_result
`Pattern-2` is *not* supported for upstream table lineage extraction as it uses nested item-selector i.e. {Source{[Schema="public",Item="book"]}[Data], Source{[Schema="public",Item="issue_history"]}[Data]} as argument to M-QUery table function i.e. Table.Combine

`Pattern-1` is supported as it first assign the table from schema to variable and then variable is used in M-Query Table function i.e. Table.Combine

## Extract endorsements to tags

By default, extracting endorsement information to tags is disabled. The feature may be useful if organization uses [endorsements](https://learn.microsoft.com/en-us/power-bi/collaborate-share/service-endorse-content) to identify content quality.

Please note that the default implementation overwrites tags for the ingested entities, if you need to preserve existing tags, consider using a [transformer](../../../../metadata-ingestion/docs/transformer/dataset_transformer.md#simple-add-dataset-globaltags) with `semantics: PATCH` tags instead of `OVERWRITE`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

3 changes: 3 additions & 0 deletions metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ source:
client_secret: bar
# Enable / Disable ingestion of user information for dashboards
extract_ownership: true
# Enable / Disable ingestion of endorsements.
# Please notice that this may overwrite any existing tags defined to ingested entities!
extract_endorsements_to_tags: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can keep it true by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per John's comment, I'll keep this disabled by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

# dataset_type_mapping is fixed mapping of Power BI datasources type to equivalent Datahub "data platform" dataset
dataset_type_mapping:
PostgreSql: postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Constant:
CORP_USER_INFO = "corpUserInfo"
CORP_USER_KEY = "corpUserKey"
CHART_INFO = "chartInfo"
GLOBAL_TAGS = "globalTags"
STATUS = "status"
CHART_ID = "powerbi.linkedin.com/charts/{}"
CHART_KEY = "chartKey"
Expand Down Expand Up @@ -139,6 +140,12 @@ class PowerBiAPIConfig(EnvBasedSourceConfigBase):
extract_lineage: bool = pydantic.Field(
default=True, description="Whether lineage should be ingested"
)
# Enable/Disable extracting endorsements to tags. Please notice this may overwrite
# any existing tags defined to those entitiies
extract_endorsements_to_tags: bool = pydantic.Field(
default=False,
description="Whether to extract endorsements to tags, note that this may overwrite existing tags",
)
# Enable/Disable extracting lineage information from PowerBI Native query
native_query_parsing: bool = pydantic.Field(
default=True,
Expand Down
48 changes: 48 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@
DashboardKeyClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
GlobalTagsClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
StatusClass,
SubTypesClass,
TagAssociationClass,
UpstreamClass,
UpstreamLineageClass,
)
Expand Down Expand Up @@ -238,8 +240,24 @@ def to_datahub_dataset(
if self.__config.extract_lineage is True:
dataset_mcps.extend(self.extract_lineage(table, ds_urn))

self.append_tag_mcp(
dataset_mcps,
ds_urn,
Constant.DATASET,
dataset.tags,
)

return dataset_mcps

@staticmethod
def transform_tags(tags: List[str]) -> GlobalTagsClass:
return GlobalTagsClass(
tags=[
TagAssociationClass(builder.make_tag_urn(tag_to_add))
for tag_to_add in tags
]
)

def to_datahub_chart_mcp(
self, tile: PowerBiAPI.Tile, ds_mcps: List[MetadataChangeProposalWrapper]
) -> List[MetadataChangeProposalWrapper]:
Expand Down Expand Up @@ -421,8 +439,31 @@ def chart_custom_properties(dashboard: PowerBiAPI.Dashboard) -> dict:
if owner_mcp is not None:
list_of_mcps.append(owner_mcp)

self.append_tag_mcp(
list_of_mcps,
dashboard_urn,
Constant.DASHBOARD,
dashboard.tags,
)

return list_of_mcps

def append_tag_mcp(
self,
list_of_mcps: List[MetadataChangeProposalWrapper],
entity_urn: str,
entity_type: str,
tags: List[str],
) -> None:
if self.__config.extract_endorsements_to_tags and tags:
tags_mcp = self.new_mcp(
entity_type=entity_type,
entity_urn=entity_urn,
aspect_name=Constant.GLOBAL_TAGS,
aspect=self.transform_tags(tags),
)
list_of_mcps.append(tags_mcp)

def to_datahub_user(
self, user: PowerBiAPI.User
) -> List[MetadataChangeProposalWrapper]:
Expand Down Expand Up @@ -678,6 +719,13 @@ def report_to_dashboard(
if owner_mcp is not None:
list_of_mcps.append(owner_mcp)

self.append_tag_mcp(
list_of_mcps,
dashboard_urn,
Constant.DASHBOARD,
report.tags,
)

return list_of_mcps

def report_to_datahub_work_units(
Expand Down
79 changes: 77 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class Workspace:
state: str
dashboards: List[Any]
datasets: Dict[str, "PowerBiAPI.PowerBIDataset"]
report_endorsements: Dict[str, List[str]]
dashboard_endorsements: Dict[str, List[str]]

@dataclass
class DataSource:
Expand Down Expand Up @@ -89,6 +91,7 @@ class PowerBIDataset:
workspace_id: str
# Table in datasets
tables: List["PowerBiAPI.Table"]
tags: List[str]

def get_urn_part(self):
return f"datasets.{self.id}"
Expand Down Expand Up @@ -148,6 +151,7 @@ class Report:
dataset: Optional["PowerBiAPI.PowerBIDataset"]
pages: List["PowerBiAPI.Page"]
users: List["PowerBiAPI.User"]
tags: List[str]

def get_urn_part(self):
return f"reports.{self.id}"
Expand Down Expand Up @@ -181,6 +185,7 @@ class Dashboard:
workspace_name: str
tiles: List["PowerBiAPI.Tile"]
users: List["PowerBiAPI.User"]
tags: List[str]

def get_urn_part(self):
return f"dashboards.{self.id}"
Expand Down Expand Up @@ -313,6 +318,7 @@ def _get_report(
description=response_dict.get("description"),
users=[],
pages=[],
tags=[],
dataset=self.get_dataset(
workspace_id=workspace_id, dataset_id=response_dict.get("datasetId")
),
Expand Down Expand Up @@ -356,7 +362,6 @@ def get_dashboard_users(self, dashboard: Dashboard) -> List[User]:
def get_dashboards(self, workspace: Workspace) -> List[Dashboard]:
"""
Get the list of dashboard from PowerBi for the given workspace identifier

TODO: Pagination. As per REST API doc (https://docs.microsoft.com/en-us/rest/api/power-bi/dashboards/get
-dashboards), there is no information available on pagination
"""
Expand Down Expand Up @@ -394,14 +399,48 @@ def get_dashboards(self, workspace: Workspace) -> List[Dashboard]:
workspace_name=workspace.name,
tiles=[],
users=[],
tags=workspace.dashboard_endorsements.get(instance.get("id", None), []),
)
for instance in dashboards_dict
if instance is not None
]

return dashboards

def get_dataset(self, workspace_id: str, dataset_id: str) -> Any:
def get_dashboard_endorsements(self, scan_result: dict) -> Dict[str, List[str]]:
"""
Store saved dashboard endorsements into a dict with dashboard id as key and
endorsements or tags as list of strings
"""
results = {}

for scanned_dashboard in scan_result["dashboards"]:
# Iterate through response and create a list of PowerBiAPI.Dashboard
dashboard_id = scanned_dashboard.get("id")
tags = self.parse_endorsement(
scanned_dashboard.get("endorsementDetails", None)
)
results[dashboard_id] = tags

return results

@staticmethod
def parse_endorsement(endorsements: Optional[dict]) -> List[str]:
if not endorsements:
return []

endorsement = endorsements.get("endorsement", None)
if not endorsement:
return []

return [endorsement]

def get_dataset(
self,
workspace_id: str,
dataset_id: str,
endorsements: Optional[dict] = None,
) -> Any:
"""
Fetch the dataset from PowerBi for the given dataset identifier
"""
Expand Down Expand Up @@ -437,6 +476,10 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any:
logger.debug("datasets = {}".format(response_dict))
# PowerBi Always return the webURL, in-case if it is None then setting complete webURL to None instead of
# None/details
tags = []
if self.__config.extract_endorsements_to_tags:
tags = self.parse_endorsement(endorsements)

return PowerBiAPI.PowerBIDataset(
id=response_dict.get("id"),
name=response_dict.get("name"),
Expand All @@ -445,6 +488,7 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any:
else None,
workspace_id=workspace_id,
tables=[],
tags=tags,
)

def get_data_sources(
Expand Down Expand Up @@ -678,6 +722,9 @@ def get_reports(
workspace_id=workspace.id, entity="reports", _id=raw_instance["id"]
),
dataset=workspace.datasets.get(raw_instance.get("datasetId")),
tags=workspace.report_endorsements.get(
raw_instance.get("id", None), []
),
)
for raw_instance in response_dict["value"]
]
Expand All @@ -704,6 +751,8 @@ def get_workspaces(self):
state="",
datasets={},
dashboards=[],
report_endorsements={},
dashboard_endorsements={},
)
for workspace in groups.get("value", [])
if workspace.get("type", None) == "Workspace"
Expand Down Expand Up @@ -843,6 +892,7 @@ def json_to_dataset_map(scan_result: dict) -> dict:
dataset_instance: PowerBiAPI.PowerBIDataset = self.get_dataset(
workspace_id=scan_result["id"],
dataset_id=dataset_dict["id"],
endorsements=dataset_dict.get("endorsementDetails", None),
)
dataset_map[dataset_instance.id] = dataset_instance
# set dataset-name
Expand Down Expand Up @@ -877,6 +927,20 @@ def init_dashboard_tiles(workspace: PowerBiAPI.Workspace) -> None:

return None

def scan_result_to_report_endorsements(
scan_result: dict,
) -> Dict[str, List[str]]:
results = {}
reports: List[dict] = scan_result.get("reports", [])

for report in reports:
report_id = report.get("id", "")
endorsements = self.parse_endorsement(
report.get("endorsementDetails", None)
)
results[report_id] = endorsements
return results

logger.info("Creating scan job for workspace")
logger.info("{}={}".format(Constant.WorkspaceId, workspace_id))
logger.debug("Hitting URL={}".format(scan_create_endpoint))
Expand All @@ -902,7 +966,18 @@ def init_dashboard_tiles(workspace: PowerBiAPI.Workspace) -> None:
state=scan_result["state"],
datasets={},
dashboards=[],
report_endorsements={},
dashboard_endorsements={},
)

if self.__config.extract_endorsements_to_tags:
workspace.dashboard_endorsements = self.get_dashboard_endorsements(
scan_result
)
workspace.report_endorsements = scan_result_to_report_endorsements(
scan_result
)

# Get workspace dashboards
workspace.dashboards = self.get_dashboards(workspace)

Expand Down
Loading