From 8beee3ea2ba0a81f4cc321791b9185a4316d9193 Mon Sep 17 00:00:00 2001 From: Allison Suarez Miranda <22477579+allisonsuarez@users.noreply.github.com> Date: Tue, 13 Oct 2020 12:54:19 -0700 Subject: [PATCH] feat: column level badges (#375) * changes for column level badges, not tested yet Signed-off-by: Allison Suarez Miranda * table metadata changes to badges Signed-off-by: Allison Suarez Miranda * changing and adding tests Signed-off-by: Allison Suarez Miranda * lint fixes Signed-off-by: Allison Suarez Miranda * rename Signed-off-by: Allison Suarez Miranda * updated HiveTableMetadataExtractor to add column level badge for partition column Signed-off-by: Allison Suarez Miranda * changed partition col condition Signed-off-by: Allison Suarez Miranda * fixed most tests, still trying to figur eout how to properly link cloumn and badge nodes on model Signed-off-by: Allison Suarez Miranda * almost there Signed-off-by: Allison Suarez Miranda * using create method for badhe node Signed-off-by: Allison Suarez Miranda * more changes Signed-off-by: Allison Suarez Miranda * docstring Signed-off-by: Allison Suarez Miranda * almost there..? Signed-off-by: Allison Suarez Miranda * fix Signed-off-by: Allison Suarez Miranda * works now, still need to fix tests and clean up Signed-off-by: Allison Suarez Miranda * fixed lint issues Signed-off-by: Allison Suarez Miranda * missing 1 test to fix Signed-off-by: Allison Suarez Miranda * still failing because cluster and db now missing in actual Signed-off-by: Allison Suarez Miranda * pushing wahtever I have Signed-off-by: Allison Suarez Miranda * fix: fix unit test cleanup Signed-off-by: feng-tao * removed print statements Signed-off-by: Allison Suarez Miranda * lint fixes Signed-off-by: Allison Suarez Miranda * removed empty badge list Signed-off-by: Allison Suarez Miranda * updated test Signed-off-by: Allison Suarez Miranda * fixed tests pr 2 Signed-off-by: Allison Suarez Miranda * added constants file for partition column string Signed-off-by: Allison Suarez Miranda * lint Signed-off-by: Allison Suarez Miranda Co-authored-by: feng-tao --- .../hive_table_metadata_extractor.py | 12 ++- .../extractor/table_metadata_constants.py | 5 + databuilder/models/table_metadata.py | 95 ++++++++++++++----- .../test_hive_table_metadata_extractor.py | 59 ++++++++---- tests/unit/models/test_table_metadata.py | 51 +++++++--- 5 files changed, 162 insertions(+), 60 deletions(-) create mode 100644 databuilder/extractor/table_metadata_constants.py diff --git a/databuilder/extractor/hive_table_metadata_extractor.py b/databuilder/extractor/hive_table_metadata_extractor.py index ccf3b4ae4..f6820a29b 100644 --- a/databuilder/extractor/hive_table_metadata_extractor.py +++ b/databuilder/extractor/hive_table_metadata_extractor.py @@ -8,6 +8,7 @@ from typing import Iterator, Union, Dict, Any from databuilder import Scoped +from databuilder.extractor.table_metadata_constants import PARTITION_BADGE from databuilder.extractor.base_extractor import Extractor from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor from databuilder.models.table_metadata import TableMetadata, ColumnMetadata @@ -101,8 +102,15 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]: for row in group: last_row = row - columns.append(ColumnMetadata(row['col_name'], row['col_description'], - row['col_type'], row['col_sort_order'])) + column = None + if row['is_partition_col'] == 1: + # create add a badge to indicate partition column + column = ColumnMetadata(row['col_name'], row['col_description'], + row['col_type'], row['col_sort_order'], [PARTITION_BADGE]) + else: + column = ColumnMetadata(row['col_name'], row['col_description'], + row['col_type'], row['col_sort_order']) + columns.append(column) is_view = last_row['is_view'] == 1 yield TableMetadata('hive', self._cluster, last_row['schema'], diff --git a/databuilder/extractor/table_metadata_constants.py b/databuilder/extractor/table_metadata_constants.py new file mode 100644 index 000000000..b2a08766e --- /dev/null +++ b/databuilder/extractor/table_metadata_constants.py @@ -0,0 +1,5 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 + +# String for partition column badge +PARTITION_BADGE = 'partition column' diff --git a/databuilder/models/table_metadata.py b/databuilder/models/table_metadata.py index 7dfacb163..a26f0e8ff 100644 --- a/databuilder/models/table_metadata.py +++ b/databuilder/models/table_metadata.py @@ -17,6 +17,55 @@ DESCRIPTION_NODE_LABEL = DESCRIPTION_NODE_LABEL_VAL +class BadgeMetadata(Neo4jCsvSerializable): + BADGE_NODE_LABEL = 'Badge' + BADGE_KEY_FORMAT = '{badge}' + BADGE_CATEGORY = 'category' + DASHBOARD_TYPE = 'dashboard' + METRIC_TYPE = 'metric' + + def __init__(self, + name: str, + category: str, + ): + self._name = name + self._category = category + self._nodes = iter([self.create_badge_node(self._name)]) + self._relations: Iterator[Dict[str, Any]] = iter([]) + + def __repr__(self) -> str: + return 'BadgeMetadata({!r}, {!r})'.format(self._name, + self._category) + + @staticmethod + def get_badge_key(name: str) -> str: + if not name: + return '' + return BadgeMetadata.BADGE_KEY_FORMAT.format(badge=name) + + @staticmethod + def create_badge_node(name: str, + category: str = 'column', + ) -> Dict[str, str]: + return {NODE_LABEL: BadgeMetadata.BADGE_NODE_LABEL, + NODE_KEY: BadgeMetadata.get_badge_key(name), + BadgeMetadata.BADGE_CATEGORY: category} + + def create_next_node(self) -> Optional[Dict[str, Any]]: + # return the string representation of the data + try: + return next(self._nodes) + except StopIteration: + return None + + def create_next_relation(self) -> Optional[Dict[str, Any]]: + # We don't emit any relations for Badge ingestion + try: + return next(self._relations) + except StopIteration: + return None + + class TagMetadata(Neo4jCsvSerializable): TAG_NODE_LABEL = 'Tag' TAG_KEY_FORMAT = '{tag}' @@ -150,16 +199,16 @@ class ColumnMetadata: COLUMN_DESCRIPTION = 'description' COLUMN_DESCRIPTION_FORMAT = '{db}://{cluster}.{schema}/{tbl}/{col}/{description_id}' - # Relation between column and tag - COL_TAG_RELATION_TYPE = 'TAGGED_BY' - TAG_COL_RELATION_TYPE = 'TAG' + # Relation between column and badge + COL_BADGE_RELATION_TYPE = 'HAS_BADGE' + BADGE_COL_RELATION_TYPE = 'BADGE_FOR' def __init__(self, name: str, description: Union[str, None], col_type: str, sort_order: int, - tags: Union[List[str], None] = None + badges: Union[List[str], None] = None ) -> None: """ TODO: Add stats @@ -173,13 +222,14 @@ def __init__(self, text=description) self.type = col_type self.sort_order = sort_order - self.tags = tags + self.badges = badges def __repr__(self) -> str: - return 'ColumnMetadata({!r}, {!r}, {!r}, {!r})'.format(self.name, - self.description, - self.type, - self.sort_order) + return 'ColumnMetadata({!r}, {!r}, {!r}, {!r}, {!r})'.format(self.name, + self.description, + self.type, + self.sort_order, + self.badges) # Tuples for de-dupe purpose on Database, Cluster, Schema. See TableMetadata docstring for more information @@ -313,7 +363,8 @@ def _get_col_key(self, col: ColumnMetadata) -> str: cluster=self.cluster, schema=self.schema, tbl=self.name, - col=col.name) + col=col.name, + badges=col.badges) def _get_col_description_key(self, col: ColumnMetadata, @@ -375,11 +426,9 @@ def _create_next_node(self) -> Iterator[Any]: # noqa: C901 node_key = self._get_col_description_key(col, col.description) yield col.description.get_node_dict(node_key) - if col.tags: - for tag in col.tags: - yield {NODE_LABEL: TagMetadata.TAG_NODE_LABEL, - NODE_KEY: TagMetadata.get_tag_key(tag), - TagMetadata.TAG_TYPE: 'default'} + if col.badges: + for badge in col.badges: + yield BadgeMetadata.create_badge_node(badge) # Database, cluster, schema others = [NodeTuple(key=self._get_database_key(), @@ -450,15 +499,15 @@ def _create_next_relation(self) -> Iterator[Any]: self._get_col_key(col), self._get_col_description_key(col, col.description)) - if col.tags: - for tag in col.tags: + if col.badges: + for badge in col.badges: yield { - RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL, - RELATION_END_LABEL: TagMetadata.TAG_NODE_LABEL, - RELATION_START_KEY: self._get_table_key(), - RELATION_END_KEY: TagMetadata.get_tag_key(tag), - RELATION_TYPE: ColumnMetadata.COL_TAG_RELATION_TYPE, - RELATION_REVERSE_TYPE: ColumnMetadata.TAG_COL_RELATION_TYPE, + RELATION_START_LABEL: ColumnMetadata.COLUMN_NODE_LABEL, + RELATION_END_LABEL: BadgeMetadata.BADGE_NODE_LABEL, + RELATION_START_KEY: self._get_col_key(col), + RELATION_END_KEY: BadgeMetadata.get_badge_key(badge), + RELATION_TYPE: ColumnMetadata.COL_BADGE_RELATION_TYPE, + RELATION_REVERSE_TYPE: ColumnMetadata.BADGE_COL_RELATION_TYPE, } others = [ diff --git a/tests/unit/extractor/test_hive_table_metadata_extractor.py b/tests/unit/extractor/test_hive_table_metadata_extractor.py index 93b4d437b..f3c81a458 100644 --- a/tests/unit/extractor/test_hive_table_metadata_extractor.py +++ b/tests/unit/extractor/test_hive_table_metadata_extractor.py @@ -50,32 +50,38 @@ def test_extraction_with_single_result(self) -> None: {'col_name': 'col_id1', 'col_type': 'bigint', 'col_description': 'description of id1', - 'col_sort_order': 0}, table), + 'col_sort_order': 0, + 'is_partition_col': 0}, table), self._union( {'col_name': 'col_id2', 'col_type': 'bigint', 'col_description': 'description of id2', - 'col_sort_order': 1}, table), + 'col_sort_order': 1, + 'is_partition_col': 0}, table), self._union( {'col_name': 'is_active', 'col_type': 'boolean', 'col_description': None, - 'col_sort_order': 2}, table), + 'col_sort_order': 2, + 'is_partition_col': 1}, table), self._union( {'col_name': 'source', 'col_type': 'varchar', 'col_description': 'description of source', - 'col_sort_order': 3}, table), + 'col_sort_order': 3, + 'is_partition_col': 0}, table), self._union( {'col_name': 'etl_created_at', 'col_type': 'timestamp', 'col_description': 'description of etl_created_at', - 'col_sort_order': 4}, table), + 'col_sort_order': 4, + 'is_partition_col': 0}, table), self._union( {'col_name': 'ds', 'col_type': 'varchar', 'col_description': None, - 'col_sort_order': 5}, table) + 'col_sort_order': 5, + 'is_partition_col': 0}, table) ] extractor = HiveTableMetadataExtractor() @@ -84,9 +90,10 @@ def test_extraction_with_single_result(self) -> None: expected = TableMetadata('hive', 'gold', 'test_schema', 'test_table', 'a table for testing', [ColumnMetadata('col_id1', 'description of id1', 'bigint', 0), ColumnMetadata('col_id2', 'description of id2', 'bigint', 1), - ColumnMetadata('is_active', None, 'boolean', 2), + ColumnMetadata('is_active', None, 'boolean', 2, ['partition column']), ColumnMetadata('source', 'description of source', 'varchar', 3), - ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4), + ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', + 4), ColumnMetadata('ds', None, 'varchar', 5)], is_view=False) self.assertEqual(expected.__repr__(), actual.__repr__()) @@ -118,63 +125,75 @@ def test_extraction_with_multiple_result(self) -> None: {'col_name': 'col_id1', 'col_type': 'bigint', 'col_description': 'description of col_id1', - 'col_sort_order': 0}, table), + 'col_sort_order': 0, + 'is_partition_col': 1}, table), self._union( {'col_name': 'col_id2', 'col_type': 'bigint', 'col_description': 'description of col_id2', - 'col_sort_order': 1}, table), + 'col_sort_order': 1, + 'is_partition_col': 0}, table), self._union( {'col_name': 'is_active', 'col_type': 'boolean', 'col_description': None, - 'col_sort_order': 2}, table), + 'col_sort_order': 2, + 'is_partition_col': 0}, table), self._union( {'col_name': 'source', 'col_type': 'varchar', 'col_description': 'description of source', - 'col_sort_order': 3}, table), + 'col_sort_order': 3, + 'is_partition_col': 0}, table), self._union( {'col_name': 'etl_created_at', 'col_type': 'timestamp', 'col_description': 'description of etl_created_at', - 'col_sort_order': 4}, table), + 'col_sort_order': 4, + 'is_partition_col': 0}, table), self._union( {'col_name': 'ds', 'col_type': 'varchar', 'col_description': None, - 'col_sort_order': 5}, table), + 'col_sort_order': 5, + 'is_partition_col': 0}, table), self._union( {'col_name': 'col_name', 'col_type': 'varchar', 'col_description': 'description of col_name', - 'col_sort_order': 0}, table1), + 'col_sort_order': 0, + 'is_partition_col': 0}, table1), self._union( {'col_name': 'col_name2', 'col_type': 'varchar', 'col_description': 'description of col_name2', - 'col_sort_order': 1}, table1), + 'col_sort_order': 1, + 'is_partition_col': 0}, table1), self._union( {'col_name': 'col_id3', 'col_type': 'varchar', 'col_description': 'description of col_id3', - 'col_sort_order': 0}, table2), + 'col_sort_order': 0, + 'is_partition_col': 0}, table2), self._union( {'col_name': 'col_name3', 'col_type': 'varchar', 'col_description': 'description of col_name3', - 'col_sort_order': 1}, table2) + 'col_sort_order': 1, + 'is_partition_col': 0}, table2) ] extractor = HiveTableMetadataExtractor() extractor.init(self.conf) expected = TableMetadata('hive', 'gold', 'test_schema1', 'test_table1', 'test table 1', - [ColumnMetadata('col_id1', 'description of col_id1', 'bigint', 0), + [ColumnMetadata('col_id1', 'description of col_id1', 'bigint', 0, + ['partition column']), ColumnMetadata('col_id2', 'description of col_id2', 'bigint', 1), ColumnMetadata('is_active', None, 'boolean', 2), ColumnMetadata('source', 'description of source', 'varchar', 3), - ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4), + ColumnMetadata('etl_created_at', 'description of etl_created_at', + 'timestamp', 4), ColumnMetadata('ds', None, 'varchar', 5)], is_view=False) self.assertEqual(expected.__repr__(), extractor.extract().__repr__()) diff --git a/tests/unit/models/test_table_metadata.py b/tests/unit/models/test_table_metadata.py index 52112c6ea..1c9bdfbea 100644 --- a/tests/unit/models/test_table_metadata.py +++ b/tests/unit/models/test_table_metadata.py @@ -10,6 +10,8 @@ class TestTableMetadata(unittest.TestCase): def setUp(self) -> None: super(TestTableMetadata, self).setUp() + TableMetadata.serialized_nodes = set() + TableMetadata.serialized_rels = set() def test_serialize(self) -> None: self.table_metadata = TableMetadata('hive', 'gold', 'test_schema1', 'test_table1', 'test_table1', [ @@ -119,8 +121,6 @@ def test_serialize(self) -> None: actual.append(relation_row) relation_row = self.table_metadata.next_relation() for i in range(0, len(self.expected_rels)): - print(self.expected_rels[i]) - print(actual[i]) self.assertEqual(actual[i], self.expected_rels[i]) # 2nd record should not show already serialized database, cluster, and schema @@ -180,7 +180,7 @@ def test_z_custom_sources(self) -> None: def test_tags_field(self) -> None: self.table_metadata4 = TableMetadata('hive', 'gold', 'test_schema4', 'test_table4', 'test_table4', [ - ColumnMetadata('test_id1', 'description of test_table1', 'bigint', 0, ['col-tag1', 'col-tag2'])], + ColumnMetadata('test_id1', 'description of test_table1', 'bigint', 0)], is_view=False, tags=['tag1', 'tag2'], attr1='uri', attr2='attr2') node_row = self.table_metadata4.next_node() @@ -195,8 +195,6 @@ def test_tags_field(self) -> None: self.assertEqual(actual[2].get('LABEL'), 'Tag') self.assertEqual(actual[2].get('KEY'), 'tag1') self.assertEqual(actual[3].get('KEY'), 'tag2') - self.assertEqual(actual[6].get('KEY'), 'col-tag1') - self.assertEqual(actual[7].get('KEY'), 'col-tag2') relation_row = self.table_metadata4.next_relation() actual = [] @@ -211,18 +209,41 @@ def test_tags_field(self) -> None: expected_tab_tag_rel2 = {'END_KEY': 'tag2', 'START_LABEL': 'Table', 'END_LABEL': 'Tag', 'START_KEY': 'hive://gold.test_schema4/test_table4', 'TYPE': 'TAGGED_BY', 'REVERSE_TYPE': 'TAG'} - expected_col_tag_rel1 = {'END_KEY': 'col-tag1', 'START_LABEL': 'Table', - 'END_LABEL': 'Tag', - 'START_KEY': 'hive://gold.test_schema4/test_table4', - 'TYPE': 'TAGGED_BY', 'REVERSE_TYPE': 'TAG'} - expected_col_tag_rel2 = {'END_KEY': 'col-tag2', 'START_LABEL': 'Table', - 'END_LABEL': 'Tag', - 'START_KEY': 'hive://gold.test_schema4/test_table4', - 'TYPE': 'TAGGED_BY', 'REVERSE_TYPE': 'TAG'} + self.assertEqual(actual[2], expected_tab_tag_rel1) self.assertEqual(actual[3], expected_tab_tag_rel2) - self.assertEqual(actual[6], expected_col_tag_rel1) - self.assertEqual(actual[7], expected_col_tag_rel2) + + def test_col_badge_field(self) -> None: + self.table_metadata4 = TableMetadata('hive', 'gold', 'test_schema4', 'test_table4', 'test_table4', [ + ColumnMetadata('test_id1', 'description of test_table1', 'bigint', 0, ['col-badge1', 'col-badge2'])], + is_view=False, attr1='uri', attr2='attr2') + + node_row = self.table_metadata4.next_node() + actual = [] + while node_row: + actual.append(node_row) + node_row = self.table_metadata4.next_node() + + self.assertEqual(actual[4].get('KEY'), 'col-badge1') + self.assertEqual(actual[5].get('KEY'), 'col-badge2') + + relation_row = self.table_metadata4.next_relation() + actual = [] + while relation_row: + actual.append(relation_row) + relation_row = self.table_metadata4.next_relation() + + expected_col_badge_rel1 = {'END_KEY': 'col-badge1', 'START_LABEL': 'Column', + 'END_LABEL': 'Badge', + 'START_KEY': 'hive://gold.test_schema4/test_table4/test_id1', + 'TYPE': 'HAS_BADGE', 'REVERSE_TYPE': 'BADGE_FOR'} + expected_col_badge_rel2 = {'END_KEY': 'col-badge2', 'START_LABEL': 'Column', + 'END_LABEL': 'Badge', + 'START_KEY': 'hive://gold.test_schema4/test_table4/test_id1', + 'TYPE': 'HAS_BADGE', 'REVERSE_TYPE': 'BADGE_FOR'} + + self.assertEqual(actual[4], expected_col_badge_rel1) + self.assertEqual(actual[5], expected_col_badge_rel2) def test_tags_populated_from_str(self) -> None: self.table_metadata5 = TableMetadata('hive', 'gold', 'test_schema5', 'test_table5', 'test_table5', [