diff --git a/README.md b/README.md index 0f84613cd..5526650af 100644 --- a/README.md +++ b/README.md @@ -314,6 +314,27 @@ job = DefaultJob( job.launch() ``` +#### [SnowflakeTableLastUpdatedExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/snowflake_table_last_updated_extractor.py "SnowflakeTableLastUpdatedExtractor") +An extractor that extracts table last updated timestamp from a Snowflake database. + +It uses same configs as the `SnowflakeMetadataExtractor` described above. + +The SQL query driving the extraction is defined [here](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/snowflake_table_last_updated_extractor.py) + +```python +job_config = ConfigFactory.from_dict({ + 'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.SNOWFLAKE_DATABASE_KEY): 'YourDbName', + 'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, + 'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, + 'extractor.snowflake_table_last_updated.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) +job = DefaultJob( + conf=job_config, + task=DefaultTask( + extractor=SnowflakeTableLastUpdatedExtractor(), + loader=AnyLoader())) +job.launch() +``` + #### [BigQueryMetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/bigquery_metadata_extractor.py "BigQuery Metdata Extractor") An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Bigquery database. diff --git a/databuilder/extractor/snowflake_table_last_updated_extractor.py b/databuilder/extractor/snowflake_table_last_updated_extractor.py new file mode 100644 index 000000000..5a8242048 --- /dev/null +++ b/databuilder/extractor/snowflake_table_last_updated_extractor.py @@ -0,0 +1,106 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 + +import logging + +from pyhocon import ConfigFactory, ConfigTree +from typing import Iterator, Union + +from databuilder import Scoped +from databuilder.extractor.base_extractor import Extractor +from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor +from databuilder.models.table_last_updated import TableLastUpdated + +LOGGER = logging.getLogger(__name__) + + +class SnowflakeTableLastUpdatedExtractor(Extractor): + """ + Extracts Snowflake table last update time from INFORMATION_SCHEMA metadata tables using SQLAlchemyExtractor. + Requirements: + snowflake-connector-python + snowflake-sqlalchemy + """ + # 'last_altered' column in 'TABLES` metadata view under 'INFORMATION_SCHEMA' contains last time when the table was + # updated (both DML and DDL update). Below query fetches that column for each table. + SQL_STATEMENT = """ + SELECT + lower({cluster_source}) AS cluster, + lower(t.table_schema) AS schema, + lower(t.table_name) AS table_name, + DATA_PART(EPOCH, t.last_altered) AS last_updated_time + FROM + {database}.INFORMATION_SCHEMA.TABLES t + {where_clause_suffix}; + """ + + # CONFIG KEYS + WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix' + CLUSTER_KEY = 'cluster_key' + USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name' + # Database Key, used to identify the database type in the UI. + DATABASE_KEY = 'database_key' + # Snowflake Database Key, used to determine which Snowflake database to connect to. + SNOWFLAKE_DATABASE_KEY = 'snowflake_database' + + # Default values + DEFAULT_CLUSTER_NAME = 'master' + + DEFAULT_CONFIG = ConfigFactory.from_dict( + {WHERE_CLAUSE_SUFFIX_KEY: ' ', + CLUSTER_KEY: DEFAULT_CLUSTER_NAME, + USE_CATALOG_AS_CLUSTER_NAME: True, + DATABASE_KEY: 'snowflake', + SNOWFLAKE_DATABASE_KEY: 'prod'} + ) + + def init(self, conf: ConfigTree) -> None: + conf = conf.with_fallback(SnowflakeTableLastUpdatedExtractor.DEFAULT_CONFIG) + + if conf.get_bool(SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME): + cluster_source = "t.table_catalog" + else: + cluster_source = "'{}'".format(conf.get_string(SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY)) + + self._database = conf.get_string(SnowflakeTableLastUpdatedExtractor.DATABASE_KEY) + self._snowflake_database = conf.get_string(SnowflakeTableLastUpdatedExtractor.SNOWFLAKE_DATABASE_KEY) + + self.sql_stmt = SnowflakeTableLastUpdatedExtractor.SQL_STATEMENT.format( + where_clause_suffix=conf.get_string(SnowflakeTableLastUpdatedExtractor.WHERE_CLAUSE_SUFFIX_KEY), + cluster_source=cluster_source, + database=self._snowflake_database + ) + + LOGGER.info('SQL for snowflake table last updated timestamp: {}'.format(self.sql_stmt)) + + # use an sql_alchemy_extractor to execute sql + self._alchemy_extractor = SQLAlchemyExtractor() + sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope()) \ + .with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt})) + + self._alchemy_extractor.init(sql_alch_conf) + self._extract_iter: Union[None, Iterator] = None + + def extract(self) -> Union[TableLastUpdated, None]: + if not self._extract_iter: + self._extract_iter = self._get_extract_iter() + try: + return next(self._extract_iter) + except StopIteration: + return None + + def get_scope(self) -> str: + return 'extractor.snowflake_table_last_updated' + + def _get_extract_iter(self) -> Iterator[TableLastUpdated]: + """ + Provides iterator of result row from SQLAlchemy extractor + """ + tbl_last_updated_row = self._alchemy_extractor.extract() + while tbl_last_updated_row: + yield TableLastUpdated(table_name=tbl_last_updated_row['table_name'], + last_updated_time_epoch=tbl_last_updated_row['last_updated_time'], + schema=tbl_last_updated_row['schema'], + db=self._database, + cluster=tbl_last_updated_row['cluster']) + tbl_last_updated_row = self._alchemy_extractor.extract() diff --git a/tests/unit/extractor/test_snowflake_table_last_updated_extractor.py b/tests/unit/extractor/test_snowflake_table_last_updated_extractor.py new file mode 100644 index 000000000..d9e597c1d --- /dev/null +++ b/tests/unit/extractor/test_snowflake_table_last_updated_extractor.py @@ -0,0 +1,298 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 + +import unittest + +from mock import patch, MagicMock +from pyhocon import ConfigFactory + +from databuilder.extractor.snowflake_table_last_updated_extractor import SnowflakeTableLastUpdatedExtractor +from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor +from databuilder.models.table_last_updated import TableLastUpdated + + +class TestSnowflakeTableLastUpdatedExtractor(unittest.TestCase): + def setUp(self) -> None: + config_dict = { + 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): + 'TEST_CONNECTION', + 'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY): + 'MY_CLUSTER', + 'extractor.snowflake_table_last_updated.{}'.format( + SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME): + False, + 'extractor.snowflake_table_last_updated.{}'.format( + SnowflakeTableLastUpdatedExtractor.SNOWFLAKE_DATABASE_KEY): + 'prod' + } + self.conf = ConfigFactory.from_dict(config_dict) + + def test_extraction_with_empty_query_result(self) -> None: + """ + Test Extraction with empty result from query + """ + with patch.object(SQLAlchemyExtractor, '_get_connection'): + extractor = SnowflakeTableLastUpdatedExtractor() + extractor.init(self.conf) + + results = extractor.extract() + self.assertIsNone(results) + + def test_extraction_with_single_result(self) -> None: + """ + Test Extraction with default cluster and database and with one table as result + """ + with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection: + connection = MagicMock() + mock_connection.return_value = connection + sql_execute = MagicMock() + connection.execute = sql_execute + sql_execute.return_value = [ + {'schema': 'test_schema', + 'table_name': 'test_table', + 'last_updated_time': 1000, + 'cluster': self.conf['extractor.snowflake_table_last_updated.{}'.format( + SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY)], + } + ] + + extractor = SnowflakeTableLastUpdatedExtractor() + extractor.init(self.conf) + actual = extractor.extract() + + expected = TableLastUpdated(schema='test_schema', table_name='test_table', + last_updated_time_epoch=1000, + db='snowflake', cluster='MY_CLUSTER') + self.assertEqual(expected.__repr__(), actual.__repr__()) + self.assertIsNone(extractor.extract()) + + def test_extraction_with_multiple_result(self) -> None: + """ + Test Extraction with default cluster and database and with multiple tables as result + """ + with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection: + connection = MagicMock() + mock_connection.return_value = connection + sql_execute = MagicMock() + connection.execute = sql_execute + + default_cluster = self.conf['extractor.snowflake_table_last_updated.{}'.format( + SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY)] + + table = {'schema': 'test_schema1', + 'table_name': 'test_table1', + 'last_updated_time': 1000, + 'cluster': default_cluster + } + + table1 = {'schema': 'test_schema1', + 'table_name': 'test_table2', + 'last_updated_time': 2000, + 'cluster': default_cluster + } + + table2 = {'schema': 'test_schema2', + 'table_name': 'test_table3', + 'last_updated_time': 3000, + 'cluster': default_cluster + } + + sql_execute.return_value = [table, table1, table2] + + extractor = SnowflakeTableLastUpdatedExtractor() + extractor.init(self.conf) + + expected = TableLastUpdated(schema='test_schema1', table_name='test_table1', + last_updated_time_epoch=1000, + db='snowflake', cluster='MY_CLUSTER') + self.assertEqual(expected.__repr__(), extractor.extract().__repr__()) + + expected = TableLastUpdated(schema='test_schema1', table_name='test_table2', + last_updated_time_epoch=2000, + db='snowflake', cluster='MY_CLUSTER') + self.assertEqual(expected.__repr__(), extractor.extract().__repr__()) + + expected = TableLastUpdated(schema='test_schema2', table_name='test_table3', + last_updated_time_epoch=3000, + db='snowflake', cluster='MY_CLUSTER') + self.assertEqual(expected.__repr__(), extractor.extract().__repr__()) + + self.assertIsNone(extractor.extract()) + + +class TestSnowflakeTableLastUpdatedExtractorWithWhereClause(unittest.TestCase): + """ + Test 'where_clause' config key in extractor + """ + def setUp(self) -> None: + self.where_clause_suffix = """ + where table_schema in ('public') and table_name = 'movies' + """ + + config_dict = { + SnowflakeTableLastUpdatedExtractor.WHERE_CLAUSE_SUFFIX_KEY: self.where_clause_suffix, + 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): + 'TEST_CONNECTION' + } + self.conf = ConfigFactory.from_dict(config_dict) + + def test_sql_statement(self) -> None: + """ + test where clause in extractor sql statement + """ + with patch.object(SQLAlchemyExtractor, '_get_connection'): + extractor = SnowflakeTableLastUpdatedExtractor() + extractor.init(self.conf) + self.assertTrue(self.where_clause_suffix in extractor.sql_stmt) + + +class TestSnowflakeTableLastUpdatedExtractorClusterKeyNoTableCatalog(unittest.TestCase): + """ + Test with 'USE_CATALOG_AS_CLUSTER_NAME' is false and 'CLUSTER_KEY' is specified + """ + def setUp(self) -> None: + self.cluster_key = "not_master" + + config_dict = { + SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY: self.cluster_key, + 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): + 'TEST_CONNECTION', + SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME: False + } + self.conf = ConfigFactory.from_dict(config_dict) + + def test_sql_statement(self) -> None: + """ + Test cluster_key in extractor sql stmt + """ + with patch.object(SQLAlchemyExtractor, '_get_connection'): + extractor = SnowflakeTableLastUpdatedExtractor() + extractor.init(self.conf) + self.assertTrue(self.cluster_key in extractor.sql_stmt) + + +class TestSnowflakeTableLastUpdatedExtractorDefaultSnowflakeDatabaseKey(unittest.TestCase): + """ + Test with SNOWFLAKE_DATABASE_KEY config specified + """ + def setUp(self) -> None: + self.snowflake_database_key = "not_prod" + + config_dict = { + SnowflakeTableLastUpdatedExtractor.SNOWFLAKE_DATABASE_KEY: self.snowflake_database_key, + 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): + 'TEST_CONNECTION' + } + self.conf = ConfigFactory.from_dict(config_dict) + + def test_sql_statement(self) -> None: + """ + Test SNOWFLAKE_DATABASE_KEY in extractor sql stmt + """ + with patch.object(SQLAlchemyExtractor, '_get_connection'): + extractor = SnowflakeTableLastUpdatedExtractor() + extractor.init(self.conf) + self.assertTrue(self.snowflake_database_key in extractor.sql_stmt) + + +class TestSnowflakeTableLastUpdatedExtractorDefaultDatabaseKey(unittest.TestCase): + """ + Test with DATABASE_KEY config specified + """ + def setUp(self) -> None: + self.database_key = 'not_snowflake' + + config_dict = { + SnowflakeTableLastUpdatedExtractor.DATABASE_KEY: self.database_key, + 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): + 'TEST_CONNECTION' + } + self.conf = ConfigFactory.from_dict(config_dict) + + def test_sql_statement(self) -> None: + """ + Test DATABASE_KEY in extractor sql stmt + """ + with patch.object(SQLAlchemyExtractor, '_get_connection'): + extractor = SnowflakeTableLastUpdatedExtractor() + extractor.init(self.conf) + self.assertFalse(self.database_key in extractor.sql_stmt) + + def test_extraction_with_database_specified(self) -> None: + """ + Test DATABASE_KEY in extractor result + """ + with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection: + connection = MagicMock() + mock_connection.return_value = connection + sql_execute = MagicMock() + connection.execute = sql_execute + + sql_execute.return_value = [ + {'schema': 'test_schema', + 'table_name': 'test_table', + 'last_updated_time': 1000, + 'cluster': 'MY_CLUSTER', + } + ] + + extractor = SnowflakeTableLastUpdatedExtractor() + extractor.init(self.conf) + actual = extractor.extract() + expected = TableLastUpdated(schema='test_schema', table_name='test_table', + last_updated_time_epoch=1000, + db=self.database_key, cluster='MY_CLUSTER') + self.assertEqual(expected.__repr__(), actual.__repr__()) + self.assertIsNone(extractor.extract()) + + +class TestSnowflakeTableLastUpdatedExtractorNoClusterKeyNoTableCatalog(unittest.TestCase): + """ + Test when USE_CATALOG_AS_CLUSTER_NAME is false and CLUSTER_KEY is NOT specified + """ + def setUp(self) -> None: + config_dict = { + 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): + 'TEST_CONNECTION', + SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME: False + } + self.conf = ConfigFactory.from_dict(config_dict) + + def test_sql_statement(self) -> None: + """ + Test cluster name in extract sql stmt + """ + with patch.object(SQLAlchemyExtractor, '_get_connection'): + extractor = SnowflakeTableLastUpdatedExtractor() + extractor.init(self.conf) + self.assertTrue(SnowflakeTableLastUpdatedExtractor.DEFAULT_CLUSTER_NAME in extractor.sql_stmt) + + +class TestSnowflakeTableLastUpdatedExtractorTableCatalogEnabled(unittest.TestCase): + """ + Test when USE_CATALOG_AS_CLUSTER_NAME is true (CLUSTER_KEY should be ignored) + """ + def setUp(self) -> None: + self.cluster_key = "not_master" + + config_dict = { + SnowflakeTableLastUpdatedExtractor.CLUSTER_KEY: self.cluster_key, + 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): + 'TEST_CONNECTION', + SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME: True + } + self.conf = ConfigFactory.from_dict(config_dict) + + def test_sql_statement(self) -> None: + """ + Ensure catalog is used as cluster in extract sql stmt + """ + with patch.object(SQLAlchemyExtractor, '_get_connection'): + extractor = SnowflakeTableLastUpdatedExtractor() + extractor.init(self.conf) + self.assertTrue('table_catalog' in extractor.sql_stmt) + self.assertFalse(self.cluster_key in extractor.sql_stmt) + + +if __name__ == '__main__': + unittest.main()