Skip to content

Commit

Permalink
feat: Add Frequent Users feature in [AtlasProxy] (#147)
Browse files Browse the repository at this point in the history
* 🎉 Initial commit.

* ♻️ Refactoring code.

* 👌 Updating code due to code review changes.

* 👌 Updating code due to code review changes. (tests)
  • Loading branch information
mgorsk1 committed Jul 13, 2020
1 parent 98794fa commit e80fcfd
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 8 deletions.
92 changes: 88 additions & 4 deletions metadata/metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from typing import Any, Dict, List, Union, Optional

from amundsen_common.models.popular_table import PopularTable
from amundsen_common.models.table import Column, Statistics, Table, Tag, User
from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader
from amundsen_common.models.user import User as UserEntity
from amundsen_common.models.dashboard import DashboardSummary
from atlasclient.client import Atlas
from atlasclient.exceptions import BadRequest
from atlasclient.models import EntityUniqueAttribute
from atlasclient.utils import (make_table_qualified_name,
parse_table_qualified_name)
parse_table_qualified_name,
extract_entities)
from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options
from flask import current_app as app
Expand Down Expand Up @@ -41,6 +42,7 @@ class AtlasProxy(BaseProxy):
STATISTICS_FORMAT_SPEC = app.config['STATISTICS_FORMAT_SPEC']
BOOKMARK_TYPE = 'Bookmark'
USER_TYPE = 'User'
READER_TYPE = 'Reader'
QN_KEY = 'qualifiedName'
BOOKMARK_ACTIVE_KEY = 'active'
GUID_KEY = 'guid'
Expand Down Expand Up @@ -386,6 +388,7 @@ def get_table(self, *, table_uri: str) -> Table:
description=attrs.get('description') or attrs.get('comment'),
owners=[User(email=attrs.get('owner'))],
columns=columns,
table_readers=self._get_readers(attrs.get(self.QN_KEY)),
last_updated_timestamp=self._parse_date(table_details.get('updateTime')))

return table
Expand Down Expand Up @@ -596,8 +599,42 @@ def get_table_by_user_relation(self, *, user_email: str, relation_type: UserReso

return {'table': results}

def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, Any]:
pass
def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, List[PopularTable]]:
user = self._driver.entity_unique_attribute(self.USER_TYPE, qualifiedName=user_email).entity

readers_guids = []
for user_reads in user['relationshipAttributes'].get('entityReads'):
entity_status = user_reads['entityStatus']
relationship_status = user_reads['relationshipStatus']

if entity_status == 'ACTIVE' and relationship_status == 'ACTIVE':
readers_guids.append(user_reads['guid'])

readers = extract_entities(self._driver.entity_bulk(guid=readers_guids, ignoreRelationships=True))

_results = {}
for reader in readers:
entity_uri = reader.attributes.get(self.ENTITY_URI_KEY)
count = reader.attributes.get('count')

if count:
details = self._extract_info_from_uri(table_uri=entity_uri)

_results[count] = dict(cluster=details.get('cluster'),
name=details.get('name'),
schema=details.get('db'),
database=details.get('entity'))

sorted_counts = sorted(_results.keys())

results = []
for count in sorted_counts:
data: dict = _results.get(count, dict())
table = PopularTable(**data)

results.append(table)

return {'table': results}

def add_resource_relation_by_user(self, *,
id: str,
Expand Down Expand Up @@ -652,6 +689,53 @@ def _parse_date(self, date: int) -> Optional[int]:
except Exception:
return None

def _get_readers(self, qualified_name: str, top: Optional[int] = 15) -> List[Reader]:
params = {
'typeName': self.READER_TYPE,
'offset': '0',
'limit': top,
'excludeDeletedEntities': True,
'entityFilters': {
'condition': 'AND',
'criterion': [
{
'attributeName': self.QN_KEY,
'operator': 'STARTSWITH',
'attributeValue': qualified_name.split('@')[0]
},
{
'attributeName': 'count',
'operator': 'gte',
'attributeValue': f'{app.config["POPULAR_TABLE_MINIMUM_READER_COUNT"]}'
}
]
},
'attributes': ['count', self.QN_KEY],
'sortBy': 'count',
'sortOrder': 'DESCENDING'
}

search_results = self._driver.search_basic.create(data=params, ignoreRelationships=False)

readers = []

for record in search_results.entities:
readers.append(record.guid)

results = []

if readers:
read_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False))

for read_entity in read_entities:
reader = Reader(user=User(email=read_entity.relationshipAttributes['user']['displayText'],
user_id=read_entity.relationshipAttributes['user']['displayText']),
read_count=read_entity.attributes['count'])

results.append(reader)

return results

def get_dashboard(self,
dashboard_uri: str,
) -> DashboardDetailEntity:
Expand Down
2 changes: 1 addition & 1 deletion metadata/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ neotime==1.7.1
pytz==2018.4
requests-aws4auth==0.9
statsd==3.2.1
pyatlasclient==1.0.3
pyatlasclient==1.0.4
beaker>=1.10.0
mocket==3.7.3
overrides==2.5
Expand Down
86 changes: 86 additions & 0 deletions metadata/tests/unit/proxy/fixtures/atlas_test_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import copy


class DottedDict(dict):
"""dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__


class Data:
entity_type = 'hive_table'
column_type = 'hive_column'
Expand Down Expand Up @@ -166,3 +173,82 @@ class Data:
bookmark_entity2,
]
}

user_entity_1 = {
"typeName": "User",
"attributes": {
"qualifiedName": "test_user_1"
},
"guid": "",
"status": "ACTIVE",
"displayText": 'test_user_1',
"classificationNames": [],
"meaningNames": [],
"meanings": []
}

user_entity_2 = {
"typeName": "User",
"attributes": {
"qualifiedName": "test_user_2"
},
"guid": "",
"status": "ACTIVE",
"displayText": 'test_user_2',
"classificationNames": [],
"meaningNames": [],
"meanings": [],
"relationshipAttributes": {
"entityReads": [
{
"entityStatus": "ACTIVE",
"relationshipStatus": "ACTIVE",
"guid": "1"
},
{
"entityStatus": "INACTIVE",
"relationshipStatus": "ACTIVE",
"guid": "2"
},
{
"entityStatus": "ACTIVE",
"relationshipStatus": "INACTIVE",
"guid": "3"
}
]
}
}

reader_entity_1 = {
"typeName": "Reader",
"attributes": {
"count": 5,
"qualifiedName": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user_1', cluster),
"entityUri": f"hive_table://{cluster}.{db}/Table1",
},
"guid": "1",
"status": "ACTIVE",
"displayText": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user', cluster),
"classificationNames": [],
"meaningNames": [],
"meanings": [],
"relationshipAttributes": {"user": user_entity_1}
}

reader_entity_2 = {
"typeName": "Reader",
"attributes": {
"count": 150,
"qualifiedName": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user_2', cluster),
"entityUri": f"hive_table://{cluster}.{db}/Table1",
},
"guid": "2",
"status": "ACTIVE",
"displayText": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user_2', cluster),
"classificationNames": [],
"meaningNames": [],
"meanings": [],
"relationshipAttributes": {"user": user_entity_2}
}

reader_entities = [DottedDict(reader_entity) for reader_entity in [reader_entity_1, reader_entity_2]]
43 changes: 40 additions & 3 deletions metadata/tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import copy
import unittest
from typing import Any, Dict, Optional, cast
from typing import Any, Dict, Optional, cast, List

from amundsen_common.models.popular_table import PopularTable
from amundsen_common.models.table import Column, Statistics, Table, Tag, User
from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader
from atlasclient.exceptions import BadRequest
from mock import MagicMock, patch
from tests.unit.proxy.fixtures.atlas_test_data import Data
from tests.unit.proxy.fixtures.atlas_test_data import Data, DottedDict

from metadata_service import create_app
from metadata_service.entity.tag_detail import TagDetail
Expand Down Expand Up @@ -303,6 +303,43 @@ def test_delete_resource_relation_by_user(self) -> None:
resource_type=ResourceType.Table)
mock_execute.assert_called_with()

def test_get_readers(self) -> None:
basic_search_result = MagicMock()
basic_search_result.entities = self.reader_entities

self.proxy._driver.search_basic.create = MagicMock(return_value=basic_search_result)

entity_bulk_result = MagicMock()
entity_bulk_result.entities = self.reader_entities
self.proxy._driver.entity_bulk = MagicMock(return_value=[entity_bulk_result])

res = self.proxy._get_readers('dummy', 1)

expected: List[Reader] = []

expected += [Reader(user=User(email='test_user_1', user_id='test_user_1'), read_count=5)]
expected += [Reader(user=User(email='test_user_2', user_id='test_user_2'), read_count=150)]

self.assertEqual(res, expected)

def test_get_frequently_used_tables(self) -> None:
entity_unique_attribute_result = MagicMock()
entity_unique_attribute_result.entity = DottedDict(self.user_entity_2)
self.proxy._driver.entity_unique_attribute = MagicMock(return_value=entity_unique_attribute_result)

entity_bulk_result = MagicMock()
entity_bulk_result.entities = [DottedDict(self.reader_entity_1)]
self.proxy._driver.entity_bulk = MagicMock(return_value=[entity_bulk_result])

expected = {'table': [PopularTable(cluster=self.cluster,
name='Table1',
schema=self.db,
database=self.entity_type)]}

res = self.proxy.get_frequently_used_tables(user_email='dummy')

self.assertEqual(expected, res)


if __name__ == '__main__':
unittest.main()

0 comments on commit e80fcfd

Please sign in to comment.