Skip to content

Commit

Permalink
Implements the column statistics for atlas proxy (#52)
Browse files Browse the repository at this point in the history
* Implements the column statistics for atlas proxy

* Adds the documentation on how to setup column statistics
  • Loading branch information
verdan authored and Hans Adriaans committed Jun 30, 2022
1 parent e583780 commit 69b1b90
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 23 deletions.
134 changes: 134 additions & 0 deletions metadata/docs/proxy/atlas/column_configs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Atlas Columns Configuration

Create a new atlas client instance. (update the host and credentials information)
```python
from atlasclient.client import Atlas
client = Atlas(host='localhost', port=21000, username='admin', password='admin')
```

## Setting up column statistics
In order to store the statistics for each column, amundsen expects you to use Atlas' Structures definition,
that will hold all the statistics as an attribute `stats` in each column.

First, you need to create Structure definition within atlas:

```python
# Change this if you want to name your Structure differently
COLUMN_STATS_ENTITY = 'col_stat'

# Make the structure definition dictionary
struct_def_dict = {
"structDefs": [
{
"name": COLUMN_STATS_ENTITY,
"attributeDefs": [
{
"name": "stat_name",
"typeName": "string",
"isOptional": False,
"cardinality": "SINGLE",
"isUnique": True,
"isIndexable": False
},
{
"name": "stat_val",
"typeName": "double",
"isOptional": False,
"cardinality": "SINGLE",
"isUnique": False,
"isIndexable": False
},
{
"name": "start_epoch",
"typeName": "date",
"isOptional": False,
"cardinality": "SINGLE",
"isUnique": False,
"isIndexable": False
},
{
"name": "end_epoch",
"typeName": "date",
"isOptional": False,
"cardinality": "SINGLE",
"isUnique": False,
"isIndexable": False
}
]
}]
}

# Create the new definition within Atlas
client.typedefs.create(data=struct_def_dict)
```

Next part is to assign the newly created structure as an attribute for all the column type entities in Atlas.

```python
# Create the entity definition for all the columns available within atlas
column_entity_defs = []
for t in client.typedefs:
for col in t.entityDefs:
# If the type name ends with '_column' then add a new attribute 'stats'
# and 'stats' would be an array of 'COLUMN_STATS_ENTITY'

# You can have your own logic of assigning specific columns with this attribute here.
if col.name.endswith('_column'):
entity_dict = col._data
stats_attribute = {
"name": "stats",
"typeName": "array<{col_entity}>".format(col_entity=COLUMN_STATS_ENTITY),
"isOptional": True,
"cardinality": "LIST",
"isUnique": False,
"isIndexable": False,
"includeInNotification": False
}
entity_dict["attributeDefs"].append(stats_attribute)
column_entity_defs.append(entity_dict)

# Make the entity definition dictionary
col_def_dict = {
"entityDefs": column_entity_defs
}

# Update the column definitions.
client.typedefs.update(data=col_def_dict)
```

Once done, the columns in atlas should have the new attribute `stats`, which you can use to store your column statistics.

*Sample statistics:*
```python
{
"typeName": "col_stat",
"attributes": {
"stat_name": "max",
"stat_val": 12.0,
"end_epoch": 1560260088,
"start_epoch": 1560260088
}
},
{
"typeName": "col_stat",
"attributes": {
"stat_name": "mean",
"stat_val": 2.2,
"end_epoch": 1560260088,
"start_epoch": 1560260088
}
},
{
"typeName": "col_stat",
"attributes": {
"stat_name": "min",
"stat_val": 3.0,
"end_epoch": 1560260088,
"start_epoch": 1560260088
}
},
```




14 changes: 8 additions & 6 deletions metadata/docs/proxy/atlas_proxy.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Atlas Proxy Configurations

Create a new atlas client instance. (update the host and credentials information)
```python
from atlasclient.client import Atlas
Expand Down Expand Up @@ -28,12 +30,6 @@ typedef_dict = {
client.typedefs.create(data=typedef_dict)
```

### Add required fields
We need to add some extra fields to atlas in order to get all the information needed for the amundsen frontend.
Adding those extra attributes in the super type entity definition would be handy to keep them in once place.

[TBD - How to add attributes definition]

### Assign superType to entity definitions
Assign newly created TABLE_ENTITY entity as super type to the entity definitions you want to behave like tables.
in the code snippet below, `'hive_table' and 'rdbms_table'` would be affected.
Expand All @@ -55,3 +51,9 @@ typedef_dict = {
}
client.typedefs.update(data=typedef_dict)
```

### Add required fields
We need to add some extra fields to atlas in order to get all the information needed for the amundsen frontend.
Following is the details (along with the code) for each item:

- [Column Configurations](/docs/proxy/atlas/column_configs.md)
53 changes: 39 additions & 14 deletions metadata/metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from metadata_service.entity.tag_detail import TagDetail

from metadata_service.entity.popular_table import PopularTable
from metadata_service.entity.table_detail import Table, User, Tag, Column
from metadata_service.entity.table_detail import Table, User, Tag, Column, Statistics
from metadata_service.entity.user_detail import User as UserEntity
from metadata_service.exception import NotFoundException
from metadata_service.proxy import BaseProxy
Expand Down Expand Up @@ -146,6 +146,43 @@ def _get_column(self, *, table_uri: str, column_name: str) -> Dict:
LOGGER.exception(f'Column not found: {str(ex)}')
raise NotFoundException(f'Column not found: {column_name}')

def _serialize_columns(self, *, entity: EntityUniqueAttribute) -> \
Union[List[Column], List]:
"""
Helper function to fetch the columns from entity and serialize them
using Column and Statistics model.
:param entity: EntityUniqueAttribute object,
along with relationshipAttributes
:return: A list of Column objects, if there are any columns available,
else an empty list.
"""
columns = list()
for column in entity.entity[self.REL_ATTRS_KEY].get('columns') or list():
col_entity = entity.referredEntities[column['guid']]
col_attrs = col_entity[self.ATTRS_KEY]
statistics = list()
for stats in col_attrs.get('stats') or list():
stats_attrs = stats['attributes']
statistics.append(
Statistics(
stat_type=stats_attrs.get('stat_name'),
stat_val=stats_attrs.get('stat_val'),
start_epoch=stats_attrs.get('start_epoch'),
end_epoch=stats_attrs.get('end_epoch'),
)
)

columns.append(
Column(
name=col_attrs.get(self.NAME_ATTRIBUTE),
description=col_attrs.get('description'),
col_type=col_attrs.get('type') or col_attrs.get('dataType'),
sort_order=col_attrs.get('position'),
stats=statistics,
)
)
return columns

def get_user_detail(self, *, user_id: str) -> Union[UserEntity, None]:
pass

Expand All @@ -161,7 +198,6 @@ def get_table(self, *, table_uri: str) -> Table:

try:
attrs = table_details[self.ATTRS_KEY]
rel_attrs = table_details[self.REL_ATTRS_KEY]

tags = []
# Using or in case, if the key 'classifications' is there with a None
Expand All @@ -173,18 +209,7 @@ def get_table(self, *, table_uri: str) -> Table:
)
)

columns = []
for column in rel_attrs.get('columns') or list():
col_entity = entity.referredEntities[column['guid']]
col_attrs = col_entity[self.ATTRS_KEY]
columns.append(
Column(
name=col_attrs.get(self.NAME_ATTRIBUTE),
description=col_attrs.get('description'),
col_type=col_attrs.get('type') or col_attrs.get('dataType'),
sort_order=col_attrs.get('position'),
)
)
columns = self._serialize_columns(entity=entity)

table = Table(database=table_info['entity'],
cluster=table_info['cluster'],
Expand Down
33 changes: 30 additions & 3 deletions metadata/tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from metadata_service import create_app
from metadata_service.entity.popular_table import PopularTable
from metadata_service.entity.table_detail import (Table, User, Tag, Column)
from metadata_service.entity.table_detail import (Table, User, Tag, Column, Statistics)
from metadata_service.entity.tag_detail import TagDetail
from metadata_service.exception import NotFoundException

Expand Down Expand Up @@ -41,7 +41,21 @@ def setUp(self):
'qualifiedName': 'column@name',
'type': 'Managed',
'description': 'column description',
'position': 1
'position': 1,
'stats': [
{'attributes': {
'stat_name': 'max',
'stat_val': '100',
'start_epoch': '100',
'end_epoch': '200',
}},
{'attributes': {
'stat_name': 'min',
'stat_val': '0',
'start_epoch': '100',
'end_epoch': '200',
}},
]
}

}
Expand Down Expand Up @@ -155,6 +169,7 @@ def test_get_rel_attributes_dict(self):
rel_attr_collection.entities = [db_entity]

self.proxy._driver.entity_bulk = MagicMock(return_value=[rel_attr_collection])
# noinspection PyTypeChecker
response = self.proxy._get_rel_attributes_dict(entities=[entity1, entity2],
attribute='db')
expected = {
Expand Down Expand Up @@ -184,10 +199,22 @@ def test_get_table(self):
ent_attrs = self.entity1['attributes']

col_attrs = self.test_column['attributes']
exp_col_stats = list()

for stats in col_attrs['stats']:
exp_col_stats.append(
Statistics(
stat_type=stats['attributes']['stat_name'],
stat_val=stats['attributes']['stat_val'],
start_epoch=stats['attributes']['start_epoch'],
end_epoch=stats['attributes']['end_epoch'],
)
)
exp_col = Column(name=col_attrs['qualifiedName'],
description='column description',
col_type='Managed',
sort_order=col_attrs['position'])
sort_order=col_attrs['position'],
stats=exp_col_stats)
expected = Table(database=self.entity_type,
cluster=self.cluster,
schema=self.db,
Expand Down

0 comments on commit 69b1b90

Please sign in to comment.