Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/tableau): Allow parsing of database name from fullName #8981

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 10 additions & 64 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
FIELD_TYPE_MAPPING,
MetadataQueryException,
TableauLineageOverrides,
TableauUpstreamReference,
clean_query,
custom_sql_graphql_query,
dashboard_graphql_query,
Expand All @@ -85,7 +86,6 @@
get_overridden_info,
get_unique_custom_sql,
make_fine_grained_lineage_class,
make_table_urn,
make_upstream_class,
published_datasource_graphql_query,
query_metadata,
Expand Down Expand Up @@ -271,7 +271,7 @@ class TableauConfig(
"You can change this if your Tableau projects contain slashes in their names, and you'd like to filter by project.",
)

default_schema_map: dict = Field(
default_schema_map: Dict[str, str] = Field(
default={}, description="Default schema to use when schema is not found."
)
ingest_tags: Optional[bool] = Field(
Expand Down Expand Up @@ -997,41 +997,16 @@ def get_upstream_tables(
)
continue

schema = table.get(tableau_constant.SCHEMA) or ""
table_name = table.get(tableau_constant.NAME) or ""
full_name = table.get(tableau_constant.FULL_NAME) or ""
upstream_db = (
table[tableau_constant.DATABASE][tableau_constant.NAME]
if table.get(tableau_constant.DATABASE)
and table[tableau_constant.DATABASE].get(tableau_constant.NAME)
else ""
)
logger.debug(
"Processing Table with Connection Type: {0} and id {1}".format(
table.get(tableau_constant.CONNECTION_TYPE) or "",
table.get(tableau_constant.ID) or "",
try:
ref = TableauUpstreamReference.create(
table, default_schema_map=self.config.default_schema_map
)
)
schema = self._get_schema(schema, upstream_db, full_name)
# if the schema is included within the table name we omit it
if (
schema
and table_name
and full_name
and table_name == full_name
and schema in table_name
):
logger.debug(
f"Omitting schema for upstream table {table[tableau_constant.ID]}, schema included in table name"
)
schema = ""
except Exception as e:
logger.info(f"Failed to generate upstream reference for {table}: {e}")
continue

table_urn = make_table_urn(
table_urn = ref.make_dataset_urn(
self.config.env,
upstream_db,
table.get(tableau_constant.CONNECTION_TYPE) or "",
schema,
table_name,
self.config.platform_instance_map,
self.config.lineage_overrides,
)
Expand All @@ -1052,7 +1027,7 @@ def get_upstream_tables(
urn=table_urn,
id=table[tableau_constant.ID],
num_cols=num_tbl_cols,
paths=set([table_path]) if table_path else set(),
paths={table_path} if table_path else set(),
)
else:
self.database_tables[table_urn].update_table(
Expand Down Expand Up @@ -2462,35 +2437,6 @@ def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]:
is_embedded_ds=True,
)

@lru_cache(maxsize=None)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lose the lru_cache capabilities, but I don't think that matters... these are such simple and quick operations

def _get_schema(self, schema_provided: str, database: str, fullName: str) -> str:
# For some databases, the schema attribute in tableau api does not return
# correct schema name for the table. For more information, see
# https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_model.html#schema_attribute.
# Hence we extract schema from fullName whenever fullName is available
schema = self._extract_schema_from_fullName(fullName) if fullName else ""
if not schema:
schema = schema_provided
elif schema != schema_provided:
logger.debug(
"Correcting schema, provided {0}, corrected {1}".format(
schema_provided, schema
)
)

if not schema and database in self.config.default_schema_map:
schema = self.config.default_schema_map[database]

return schema

@lru_cache(maxsize=None)
def _extract_schema_from_fullName(self, fullName: str) -> str:
# fullName is observed to be in format [schemaName].[tableName]
# OR simply tableName OR [tableName]
if fullName.startswith("[") and "].[" in fullName:
return fullName[1 : fullName.index("]")]
return ""

@lru_cache(maxsize=None)
def get_last_modified(
self, creator: Optional[str], created_at: bytes, updated_at: bytes
Expand Down
162 changes: 123 additions & 39 deletions metadata-ingestion/src/datahub/ingestion/source/tableau_common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import html
import logging
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, List, Optional, Tuple

from pydantic.fields import Field

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.source import tableau_constant as tc
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
Expand All @@ -31,6 +34,8 @@
)
from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult

logger = logging.getLogger(__name__)


class TableauLineageOverrides(ConfigModel):
platform_override_map: Optional[Dict[str, str]] = Field(
Expand Down Expand Up @@ -537,12 +542,12 @@ def get_fully_qualified_table_name(
platform: str,
upstream_db: str,
schema: str,
full_name: str,
table_name: str,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was never passed full_name...:

            table_urn = make_table_urn(
                self.config.env,
                upstream_db,
                table.get(tableau_constant.CONNECTION_TYPE) or "",
                schema,
                table_name,
                self.config.platform_instance_map,
                self.config.lineage_overrides,
            )
	```
5th argument is clearly `table_name`, not `full_name`

) -> str:
if platform == "athena":
upstream_db = ""
database_name = f"{upstream_db}." if upstream_db else ""
final_name = full_name.replace("[", "").replace("]", "")
final_name = table_name.replace("[", "").replace("]", "")

schema_name = f"{schema}." if schema else ""

Expand Down Expand Up @@ -573,17 +578,123 @@ def get_fully_qualified_table_name(
return fully_qualified_table_name


def get_platform_instance(
platform: str, platform_instance_map: Optional[Dict[str, str]]
) -> Optional[str]:
if platform_instance_map is not None and platform in platform_instance_map.keys():
return platform_instance_map[platform]
@dataclass
class TableauUpstreamReference:
database: Optional[str]
schema: Optional[str]
table: str

connection_type: str

@classmethod
def create(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meant to replicate logic in get_upstream_tables. Pulls values out of table dict, parses schema from full_name and default_schema_map (and now parses database too), and does this schema in table check for redshift

cls, d: dict, default_schema_map: Optional[Dict[str, str]] = None
) -> "TableauUpstreamReference":
# Values directly from `table` object from Tableau
database = t_database = d.get(tc.DATABASE, {}).get(tc.NAME)
schema = t_schema = d.get(tc.SCHEMA)
table = t_table = d.get(tc.NAME) or ""
t_full_name = d.get(tc.FULL_NAME)
t_connection_type = d[tc.CONNECTION_TYPE] # required to generate urn
t_id = d[tc.ID]

parsed_full_name = cls.parse_full_name(t_full_name)
if parsed_full_name and len(parsed_full_name) == 3:
database, schema, table = parsed_full_name
elif parsed_full_name and len(parsed_full_name) == 2:
schema, table = parsed_full_name
else:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" Did not parse full name {t_full_name}: unexpected number of values",
)

if not schema and default_schema_map and database in default_schema_map:
schema = default_schema_map[database]

if database != t_database:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" replacing database {t_database} with {database} from full name {t_full_name}"
)
if schema != t_schema:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" replacing schema {t_schema} with {schema} from full name {t_full_name}"
)
if table != t_table:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" replacing table {t_table} with {table} from full name {t_full_name}"
)

# TODO: See if we can remove this -- made for redshift
if (
schema
and t_table
and t_full_name
and t_table == t_full_name
and schema in t_table
):
logger.debug(
f"Omitting schema for upstream table {t_id}, schema included in table name"
)
schema = ""

return cls(
database=database,
schema=schema,
table=table,
connection_type=t_connection_type,
)

@staticmethod
def parse_full_name(full_name: Optional[str]) -> Optional[List[str]]:
# fullName is observed to be in formats:
# [database].[schema].[table]
# [schema].[table]
# [table]
# table
# schema

# TODO: Validate the startswith check. Currently required for our integration tests
if full_name is None or not full_name.startswith("["):
return None

return full_name.replace("[", "").replace("]", "").split(".")

def make_dataset_urn(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically copied from make_table_urn

self,
env: str,
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
) -> str:
(
upstream_db,
platform_instance,
platform,
original_platform,
) = get_overridden_info(
connection_type=self.connection_type,
upstream_db=self.database,
lineage_overrides=lineage_overrides,
platform_instance_map=platform_instance_map,
)

table_name = get_fully_qualified_table_name(
original_platform,
upstream_db or "",
self.schema,
self.table,
)

return None
return builder.make_dataset_urn_with_platform_instance(
platform, table_name, platform_instance, env
)


def get_overridden_info(
connection_type: str,
connection_type: Optional[str],
upstream_db: Optional[str],
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
Expand All @@ -605,43 +716,16 @@ def get_overridden_info(
):
upstream_db = lineage_overrides.database_override_map[upstream_db]

platform_instance = get_platform_instance(original_platform, platform_instance_map)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this method in favor of this one line statement

platform_instance = (
platform_instance_map.get(original_platform) if platform_instance_map else None
)

if original_platform in ("athena", "hive", "mysql"): # Two tier databases
upstream_db = None

return upstream_db, platform_instance, platform, original_platform


def make_table_urn(
env: str,
upstream_db: Optional[str],
connection_type: str,
schema: str,
full_name: str,
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
) -> str:

upstream_db, platform_instance, platform, original_platform = get_overridden_info(
connection_type=connection_type,
upstream_db=upstream_db,
lineage_overrides=lineage_overrides,
platform_instance_map=platform_instance_map,
)

table_name = get_fully_qualified_table_name(
original_platform,
upstream_db if upstream_db is not None else "",
schema,
full_name,
)

return builder.make_dataset_urn_with_platform_instance(
platform, table_name, platform_instance, env
)


def make_description_from_params(description, formula):
"""
Generate column description
Expand Down
Loading
Loading