Skip to content

Commit

Permalink
Create a RedshiftMetadataExtractor that supports late binding views
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Lawrence authored and Nathan Lawrence committed Sep 2, 2020
1 parent c595680 commit 1641197
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 116 deletions.
108 changes: 108 additions & 0 deletions databuilder/extractor/base_postgres_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import logging
from collections import namedtuple

from pyhocon import ConfigFactory, ConfigTree
from typing import Iterator, Union, Dict, Any

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby


TableKey = namedtuple('TableKey', ['schema', 'table_name'])

LOGGER = logging.getLogger(__name__)


class BasePostgresMetadataExtractor(Extractor):
"""
Extracts Postgres table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""

# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'

# Default values
DEFAULT_CLUSTER_NAME = 'master'

DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True}
)

def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(BasePostgresMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(BasePostgresMetadataExtractor.CLUSTER_KEY))

self._database = conf.get_string(BasePostgresMetadataExtractor.DATABASE_KEY, default='postgres')

self.sql_stmt = self.get_sql_statement(
where_clause_suffix=conf.get_string(BasePostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
use_catalog_as_cluster_name=conf.get_bool(BasePostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME)
)

self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))

self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)

LOGGER.info('SQL for postgres metadata: {}'.format(self.sql_stmt))

self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None

def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []

for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))

yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns)

def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()

def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])

return None
142 changes: 26 additions & 116 deletions databuilder/extractor/postgres_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -1,132 +1,42 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401

import logging
from collections import namedtuple
from databuilder.extractor.base_postgres_metadata_extractor import BasePostgresMetadataExtractor

from pyhocon import ConfigFactory, ConfigTree
from typing import Iterator, Union, Dict, Any

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby


TableKey = namedtuple('TableKey', ['schema', 'table_name'])

LOGGER = logging.getLogger(__name__)


class PostgresMetadataExtractor(Extractor):
class PostgresMetadataExtractor(BasePostgresMetadataExtractor):
"""
Extracts Postgres table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
# SELECT statement from postgres information_schema to extract table and column metadata
SQL_STATEMENT = """
SELECT
{cluster_source} as cluster, c.table_schema as schema, c.table_name as name, pgtd.description as description
,c.column_name as col_name, c.data_type as col_type
, pgcd.description as col_description, ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
INNER JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname
LEFT JOIN
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0
{where_clause_suffix}
ORDER by cluster, schema, name, col_sort_order ;
"""

# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'

# Default values
DEFAULT_CLUSTER_NAME = 'master'

DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True}
)

def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(PostgresMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(PostgresMetadataExtractor.CLUSTER_KEY))

if conf.get_bool(PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
def get_sql_statement(self, use_catalog_as_cluster_name, where_clause_suffix):
# type: (bool, str) -> str
if use_catalog_as_cluster_name:
cluster_source = "c.table_catalog"
else:
cluster_source = "'{}'".format(self._cluster)

self._database = conf.get_string(PostgresMetadataExtractor.DATABASE_KEY, default='postgres')

self.sql_stmt = PostgresMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
cluster_source=cluster_source
return """
SELECT
{cluster_source} as cluster, c.table_schema as schema, c.table_name as name, pgtd.description as description
,c.column_name as col_name, c.data_type as col_type
, pgcd.description as col_description, ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
INNER JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname
LEFT JOIN
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0
{where_clause_suffix}
ORDER by cluster, schema, name, col_sort_order ;
""".format(
cluster_source=cluster_source,
where_clause_suffix=where_clause_suffix,
)

self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))

self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)

LOGGER.info('SQL for postgres metadata: {}'.format(self.sql_stmt))

self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None

def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self) -> str:
def get_scope(self):
# type: () -> str
return 'extractor.postgres_metadata'

def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []

for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))

yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns)

def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()

def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])

return None
71 changes: 71 additions & 0 deletions databuilder/extractor/redshift_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401

from databuilder import Scoped
from databuilder.extractor.base_postgres_metadata_extractor import BasePostgresMetadataExtractor


class RedshiftMetadataExtractor(BasePostgresMetadataExtractor):
"""
Extracts Redshift table and column metadata from underlying meta store database using SQLAlchemyExtractor
This differs from the PostgresMetadataExtractor because in order to support Redshift's late binding views,
we need to join the INFORMATION_SCHEMA data against the function PG_GET_LATE_BINDING_VIEW_COLS().
"""

def get_sql_statement(self, use_catalog_as_cluster_name, where_clause_suffix):
# type: (bool, str) -> str
if use_catalog_as_cluster_name:
cluster_source = "CURRENT_DATABASE()"
else:
cluster_source = "'{}'".format(self._cluster)

# type: () -> str
return """
SELECT
*
FROM (
SELECT
{cluster_source} as cluster,
c.table_schema as schema,
c.table_name as name,
pgtd.description as description,
c.column_name as col_name,
c.data_type as col_type,
pgcd.description as col_description,
ordinal_position as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS c
INNER JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname
LEFT JOIN
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0
UNION
SELECT
{cluster_source} as cluster,
view_schema as schema,
view_name as name,
NULL as description,
column_name as col_name,
data_type as col_type,
NULL as col_description,
ordinal_position as col_sort_order
FROM
PG_GET_LATE_BINDING_VIEW_COLS()
COLS(view_schema NAME, view_name NAME, column_name NAME, data_type VARCHAR, ordinal_position INT)
)
{where_clause_suffix}
ORDER by cluster, schema, name, col_sort_order ;
""".format(
cluster_source=cluster_source,
where_clause_suffix=where_clause_suffix,
)

def get_scope(self):
# type: () -> str
return 'extractor.redshift_metadata'

0 comments on commit 1641197

Please sign in to comment.