Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: "PostgresMetadataExtractor doesn't discover Redshift late binding views" #356

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
116 changes: 116 additions & 0 deletions databuilder/extractor/base_postgres_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import abc
import logging
from collections import namedtuple

from pyhocon import ConfigFactory, ConfigTree
from typing import Iterator, Union, Dict, Any

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby


TableKey = namedtuple('TableKey', ['schema', 'table_name'])

LOGGER = logging.getLogger(__name__)


class BasePostgresMetadataExtractor(Extractor):
"""
Extracts Postgres table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""

# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'

# Default values
DEFAULT_CLUSTER_NAME = 'master'

DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True}
)

@abc.abstractmethod
def get_sql_statement(self, use_catalog_as_cluster_name: bool, where_clause_suffix: str) -> Any:
"""
:return: Provides a record or None if no more to extract
"""
return None

def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(BasePostgresMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(BasePostgresMetadataExtractor.CLUSTER_KEY))

self._database = conf.get_string(BasePostgresMetadataExtractor.DATABASE_KEY, default='postgres')

self.sql_stmt = self.get_sql_statement(
use_catalog_as_cluster_name=conf.get_bool(BasePostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME),
where_clause_suffix=conf.get_string(BasePostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
)

self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))

self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)

LOGGER.info('SQL for postgres metadata: {}'.format(self.sql_stmt))

self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None

def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []

for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))

yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns)

def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()

def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])

return None
141 changes: 26 additions & 115 deletions databuilder/extractor/postgres_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -1,132 +1,43 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import logging
from collections import namedtuple
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401

from pyhocon import ConfigFactory, ConfigTree
from typing import Iterator, Union, Dict, Any
from databuilder.extractor.base_postgres_metadata_extractor import BasePostgresMetadataExtractor

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby


TableKey = namedtuple('TableKey', ['schema', 'table_name'])

LOGGER = logging.getLogger(__name__)


class PostgresMetadataExtractor(Extractor):
class PostgresMetadataExtractor(BasePostgresMetadataExtractor):
"""
Extracts Postgres table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
# SELECT statement from postgres information_schema to extract table and column metadata
SQL_STATEMENT = """
SELECT
{cluster_source} as cluster, c.table_schema as schema, c.table_name as name, pgtd.description as description
,c.column_name as col_name, c.data_type as col_type
, pgcd.description as col_description, ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
INNER JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname
LEFT JOIN
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0
{where_clause_suffix}
ORDER by cluster, schema, name, col_sort_order ;
"""

# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'

# Default values
DEFAULT_CLUSTER_NAME = 'master'

DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True}
)

def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(PostgresMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(PostgresMetadataExtractor.CLUSTER_KEY))

if conf.get_bool(PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
def get_sql_statement(self, use_catalog_as_cluster_name, where_clause_suffix):
# type: (bool, str) -> str
if use_catalog_as_cluster_name:
cluster_source = "c.table_catalog"
else:
cluster_source = "'{}'".format(self._cluster)

self._database = conf.get_string(PostgresMetadataExtractor.DATABASE_KEY, default='postgres')

self.sql_stmt = PostgresMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
cluster_source=cluster_source
return """
SELECT
{cluster_source} as cluster, c.table_schema as schema, c.table_name as name, pgtd.description as description
,c.column_name as col_name, c.data_type as col_type
, pgcd.description as col_description, ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
INNER JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname
LEFT JOIN
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0
{where_clause_suffix}
ORDER by cluster, schema, name, col_sort_order ;
""".format(
cluster_source=cluster_source,
where_clause_suffix=where_clause_suffix,
)

self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))

self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)

LOGGER.info('SQL for postgres metadata: {}'.format(self.sql_stmt))

self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None

def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self) -> str:
def get_scope(self):
# type: () -> str
return 'extractor.postgres_metadata'

def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []

for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))

yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns)

def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()

def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])

return None
71 changes: 71 additions & 0 deletions databuilder/extractor/redshift_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401

from databuilder.extractor.base_postgres_metadata_extractor import BasePostgresMetadataExtractor


class RedshiftMetadataExtractor(BasePostgresMetadataExtractor):
"""
Extracts Redshift table and column metadata from underlying meta store database using SQLAlchemyExtractor

This differs from the PostgresMetadataExtractor because in order to support Redshift's late binding views,
we need to join the INFORMATION_SCHEMA data against the function PG_GET_LATE_BINDING_VIEW_COLS().
"""

def get_sql_statement(self, use_catalog_as_cluster_name, where_clause_suffix):
# type: (bool, str) -> str
if use_catalog_as_cluster_name:
cluster_source = "CURRENT_DATABASE()"
else:
cluster_source = "'{}'".format(self._cluster)

return """
SELECT
*
FROM (
SELECT
{cluster_source} as cluster,
c.table_schema as schema,
c.table_name as name,
pgtd.description as description,
c.column_name as col_name,
c.data_type as col_type,
pgcd.description as col_description,
ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
INNER JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname
LEFT JOIN
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0

UNION

SELECT
{cluster_source} as cluster,
view_schema as schema,
view_name as name,
NULL as description,
column_name as col_name,
data_type as col_type,
NULL as col_description,
ordinal_position as col_sort_order
FROM
PG_GET_LATE_BINDING_VIEW_COLS()
COLS(view_schema NAME, view_name NAME, column_name NAME, data_type VARCHAR, ordinal_position INT)
)

{where_clause_suffix}
ORDER by cluster, schema, name, col_sort_order ;
""".format(
cluster_source=cluster_source,
where_clause_suffix=where_clause_suffix,
)

def get_scope(self):
# type: () -> str
return 'extractor.redshift_metadata'