Skip to content

Commit

Permalink
feat: column level badges (#375)
Browse files Browse the repository at this point in the history
* changes for column level badges, not tested yet

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* table metadata changes to badges

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* changing and adding tests

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* lint fixes

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* rename

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* updated HiveTableMetadataExtractor to add column level badge for partition column

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* changed partition col condition

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* fixed most tests, still trying to figur eout how to properly link cloumn and badge nodes on model

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* almost there

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* using create method for badhe node

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* more changes

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* docstring

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* almost there..?

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* fix

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* works now, still need to fix tests and clean up

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* fixed lint issues

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* missing 1 test to fix

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* still failing because cluster and db now missing in actual

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* pushing wahtever I have

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* fix: fix unit test cleanup

Signed-off-by: feng-tao <fengtao04@gmail.com>

* removed print statements

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* lint fixes

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* removed empty badge list

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* updated test

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* fixed tests pr 2

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* added constants file for partition column string

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* lint

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

Co-authored-by: feng-tao <fengtao04@gmail.com>
  • Loading branch information
allisonsuarez and feng-tao committed Oct 13, 2020
1 parent acac5e8 commit 8beee3e
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 60 deletions.
12 changes: 10 additions & 2 deletions databuilder/extractor/hive_table_metadata_extractor.py
Expand Up @@ -8,6 +8,7 @@
from typing import Iterator, Union, Dict, Any

from databuilder import Scoped
from databuilder.extractor.table_metadata_constants import PARTITION_BADGE
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
Expand Down Expand Up @@ -101,8 +102,15 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]:

for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))
column = None
if row['is_partition_col'] == 1:
# create add a badge to indicate partition column
column = ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order'], [PARTITION_BADGE])
else:
column = ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order'])
columns.append(column)
is_view = last_row['is_view'] == 1
yield TableMetadata('hive', self._cluster,
last_row['schema'],
Expand Down
5 changes: 5 additions & 0 deletions databuilder/extractor/table_metadata_constants.py
@@ -0,0 +1,5 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

# String for partition column badge
PARTITION_BADGE = 'partition column'
95 changes: 72 additions & 23 deletions databuilder/models/table_metadata.py
Expand Up @@ -17,6 +17,55 @@
DESCRIPTION_NODE_LABEL = DESCRIPTION_NODE_LABEL_VAL


class BadgeMetadata(Neo4jCsvSerializable):
BADGE_NODE_LABEL = 'Badge'
BADGE_KEY_FORMAT = '{badge}'
BADGE_CATEGORY = 'category'
DASHBOARD_TYPE = 'dashboard'
METRIC_TYPE = 'metric'

def __init__(self,
name: str,
category: str,
):
self._name = name
self._category = category
self._nodes = iter([self.create_badge_node(self._name)])
self._relations: Iterator[Dict[str, Any]] = iter([])

def __repr__(self) -> str:
return 'BadgeMetadata({!r}, {!r})'.format(self._name,
self._category)

@staticmethod
def get_badge_key(name: str) -> str:
if not name:
return ''
return BadgeMetadata.BADGE_KEY_FORMAT.format(badge=name)

@staticmethod
def create_badge_node(name: str,
category: str = 'column',
) -> Dict[str, str]:
return {NODE_LABEL: BadgeMetadata.BADGE_NODE_LABEL,
NODE_KEY: BadgeMetadata.get_badge_key(name),
BadgeMetadata.BADGE_CATEGORY: category}

def create_next_node(self) -> Optional[Dict[str, Any]]:
# return the string representation of the data
try:
return next(self._nodes)
except StopIteration:
return None

def create_next_relation(self) -> Optional[Dict[str, Any]]:
# We don't emit any relations for Badge ingestion
try:
return next(self._relations)
except StopIteration:
return None


class TagMetadata(Neo4jCsvSerializable):
TAG_NODE_LABEL = 'Tag'
TAG_KEY_FORMAT = '{tag}'
Expand Down Expand Up @@ -150,16 +199,16 @@ class ColumnMetadata:
COLUMN_DESCRIPTION = 'description'
COLUMN_DESCRIPTION_FORMAT = '{db}://{cluster}.{schema}/{tbl}/{col}/{description_id}'

# Relation between column and tag
COL_TAG_RELATION_TYPE = 'TAGGED_BY'
TAG_COL_RELATION_TYPE = 'TAG'
# Relation between column and badge
COL_BADGE_RELATION_TYPE = 'HAS_BADGE'
BADGE_COL_RELATION_TYPE = 'BADGE_FOR'

def __init__(self,
name: str,
description: Union[str, None],
col_type: str,
sort_order: int,
tags: Union[List[str], None] = None
badges: Union[List[str], None] = None
) -> None:
"""
TODO: Add stats
Expand All @@ -173,13 +222,14 @@ def __init__(self,
text=description)
self.type = col_type
self.sort_order = sort_order
self.tags = tags
self.badges = badges

def __repr__(self) -> str:
return 'ColumnMetadata({!r}, {!r}, {!r}, {!r})'.format(self.name,
self.description,
self.type,
self.sort_order)
return 'ColumnMetadata({!r}, {!r}, {!r}, {!r}, {!r})'.format(self.name,
self.description,
self.type,
self.sort_order,
self.badges)


# Tuples for de-dupe purpose on Database, Cluster, Schema. See TableMetadata docstring for more information
Expand Down Expand Up @@ -313,7 +363,8 @@ def _get_col_key(self, col: ColumnMetadata) -> str:
cluster=self.cluster,
schema=self.schema,
tbl=self.name,
col=col.name)
col=col.name,
badges=col.badges)

def _get_col_description_key(self,
col: ColumnMetadata,
Expand Down Expand Up @@ -375,11 +426,9 @@ def _create_next_node(self) -> Iterator[Any]: # noqa: C901
node_key = self._get_col_description_key(col, col.description)
yield col.description.get_node_dict(node_key)

if col.tags:
for tag in col.tags:
yield {NODE_LABEL: TagMetadata.TAG_NODE_LABEL,
NODE_KEY: TagMetadata.get_tag_key(tag),
TagMetadata.TAG_TYPE: 'default'}
if col.badges:
for badge in col.badges:
yield BadgeMetadata.create_badge_node(badge)

# Database, cluster, schema
others = [NodeTuple(key=self._get_database_key(),
Expand Down Expand Up @@ -450,15 +499,15 @@ def _create_next_relation(self) -> Iterator[Any]:
self._get_col_key(col),
self._get_col_description_key(col, col.description))

if col.tags:
for tag in col.tags:
if col.badges:
for badge in col.badges:
yield {
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_LABEL: TagMetadata.TAG_NODE_LABEL,
RELATION_START_KEY: self._get_table_key(),
RELATION_END_KEY: TagMetadata.get_tag_key(tag),
RELATION_TYPE: ColumnMetadata.COL_TAG_RELATION_TYPE,
RELATION_REVERSE_TYPE: ColumnMetadata.TAG_COL_RELATION_TYPE,
RELATION_START_LABEL: ColumnMetadata.COLUMN_NODE_LABEL,
RELATION_END_LABEL: BadgeMetadata.BADGE_NODE_LABEL,
RELATION_START_KEY: self._get_col_key(col),
RELATION_END_KEY: BadgeMetadata.get_badge_key(badge),
RELATION_TYPE: ColumnMetadata.COL_BADGE_RELATION_TYPE,
RELATION_REVERSE_TYPE: ColumnMetadata.BADGE_COL_RELATION_TYPE,
}

others = [
Expand Down
59 changes: 39 additions & 20 deletions tests/unit/extractor/test_hive_table_metadata_extractor.py
Expand Up @@ -50,32 +50,38 @@ def test_extraction_with_single_result(self) -> None:
{'col_name': 'col_id1',
'col_type': 'bigint',
'col_description': 'description of id1',
'col_sort_order': 0}, table),
'col_sort_order': 0,
'is_partition_col': 0}, table),
self._union(
{'col_name': 'col_id2',
'col_type': 'bigint',
'col_description': 'description of id2',
'col_sort_order': 1}, table),
'col_sort_order': 1,
'is_partition_col': 0}, table),
self._union(
{'col_name': 'is_active',
'col_type': 'boolean',
'col_description': None,
'col_sort_order': 2}, table),
'col_sort_order': 2,
'is_partition_col': 1}, table),
self._union(
{'col_name': 'source',
'col_type': 'varchar',
'col_description': 'description of source',
'col_sort_order': 3}, table),
'col_sort_order': 3,
'is_partition_col': 0}, table),
self._union(
{'col_name': 'etl_created_at',
'col_type': 'timestamp',
'col_description': 'description of etl_created_at',
'col_sort_order': 4}, table),
'col_sort_order': 4,
'is_partition_col': 0}, table),
self._union(
{'col_name': 'ds',
'col_type': 'varchar',
'col_description': None,
'col_sort_order': 5}, table)
'col_sort_order': 5,
'is_partition_col': 0}, table)
]

extractor = HiveTableMetadataExtractor()
Expand All @@ -84,9 +90,10 @@ def test_extraction_with_single_result(self) -> None:
expected = TableMetadata('hive', 'gold', 'test_schema', 'test_table', 'a table for testing',
[ColumnMetadata('col_id1', 'description of id1', 'bigint', 0),
ColumnMetadata('col_id2', 'description of id2', 'bigint', 1),
ColumnMetadata('is_active', None, 'boolean', 2),
ColumnMetadata('is_active', None, 'boolean', 2, ['partition column']),
ColumnMetadata('source', 'description of source', 'varchar', 3),
ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4),
ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp',
4),
ColumnMetadata('ds', None, 'varchar', 5)],
is_view=False)
self.assertEqual(expected.__repr__(), actual.__repr__())
Expand Down Expand Up @@ -118,63 +125,75 @@ def test_extraction_with_multiple_result(self) -> None:
{'col_name': 'col_id1',
'col_type': 'bigint',
'col_description': 'description of col_id1',
'col_sort_order': 0}, table),
'col_sort_order': 0,
'is_partition_col': 1}, table),
self._union(
{'col_name': 'col_id2',
'col_type': 'bigint',
'col_description': 'description of col_id2',
'col_sort_order': 1}, table),
'col_sort_order': 1,
'is_partition_col': 0}, table),
self._union(
{'col_name': 'is_active',
'col_type': 'boolean',
'col_description': None,
'col_sort_order': 2}, table),
'col_sort_order': 2,
'is_partition_col': 0}, table),
self._union(
{'col_name': 'source',
'col_type': 'varchar',
'col_description': 'description of source',
'col_sort_order': 3}, table),
'col_sort_order': 3,
'is_partition_col': 0}, table),
self._union(
{'col_name': 'etl_created_at',
'col_type': 'timestamp',
'col_description': 'description of etl_created_at',
'col_sort_order': 4}, table),
'col_sort_order': 4,
'is_partition_col': 0}, table),
self._union(
{'col_name': 'ds',
'col_type': 'varchar',
'col_description': None,
'col_sort_order': 5}, table),
'col_sort_order': 5,
'is_partition_col': 0}, table),
self._union(
{'col_name': 'col_name',
'col_type': 'varchar',
'col_description': 'description of col_name',
'col_sort_order': 0}, table1),
'col_sort_order': 0,
'is_partition_col': 0}, table1),
self._union(
{'col_name': 'col_name2',
'col_type': 'varchar',
'col_description': 'description of col_name2',
'col_sort_order': 1}, table1),
'col_sort_order': 1,
'is_partition_col': 0}, table1),
self._union(
{'col_name': 'col_id3',
'col_type': 'varchar',
'col_description': 'description of col_id3',
'col_sort_order': 0}, table2),
'col_sort_order': 0,
'is_partition_col': 0}, table2),
self._union(
{'col_name': 'col_name3',
'col_type': 'varchar',
'col_description': 'description of col_name3',
'col_sort_order': 1}, table2)
'col_sort_order': 1,
'is_partition_col': 0}, table2)
]

extractor = HiveTableMetadataExtractor()
extractor.init(self.conf)

expected = TableMetadata('hive', 'gold', 'test_schema1', 'test_table1', 'test table 1',
[ColumnMetadata('col_id1', 'description of col_id1', 'bigint', 0),
[ColumnMetadata('col_id1', 'description of col_id1', 'bigint', 0,
['partition column']),
ColumnMetadata('col_id2', 'description of col_id2', 'bigint', 1),
ColumnMetadata('is_active', None, 'boolean', 2),
ColumnMetadata('source', 'description of source', 'varchar', 3),
ColumnMetadata('etl_created_at', 'description of etl_created_at', 'timestamp', 4),
ColumnMetadata('etl_created_at', 'description of etl_created_at',
'timestamp', 4),
ColumnMetadata('ds', None, 'varchar', 5)],
is_view=False)
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())
Expand Down

0 comments on commit 8beee3e

Please sign in to comment.