Skip to content

Commit

Permalink
atlas_search_extractor | 🎉 Initial commit.
Browse files Browse the repository at this point in the history
Signed-off-by: mgorsk1 <gorskimariusz13@gmail.com>
  • Loading branch information
mgorsk1 committed Dec 1, 2020
1 parent 2343a90 commit 0604f8f
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 0 deletions.
204 changes: 204 additions & 0 deletions databuilder/extractor/atlas_search_data_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import importlib
import logging
from copy import deepcopy
from functools import reduce
from typing import Iterator, Optional, List, Tuple, Any, Dict, Union

from atlasclient.client import Atlas
from atlasclient.models import Entity, SearchBasic
from atlasclient.utils import extract_entities
from pyhocon import ConfigTree, ConfigFactory

from databuilder.extractor.base_extractor import Extractor

LOGGER = logging.getLogger(__name__)

# custom types
type_fields_mapping_spec = Dict[str, List[Tuple[str, str, Any, Any]]]
type_fields_mapping = List[Tuple[str, str, Any, Any]]

# @todo document classes/methods
# @todo write tests


class AtlasSearchDataExtractor(Extractor):
ATLAS_URL_CONFIG_KEY = 'atlas_url'
ATLAS_PORT_CONFIG_KEY = 'atlas_port'
ATLAS_PROTOCOL_CONFIG_KEY = 'atlas_protocol'
ATLAS_VALIDATE_SSL_CONFIG_KEY = 'atlas_validate_ssl'
ATLAS_USERNAME_CONFIG_KEY = 'atlas_auth_user'
ATLAS_PASSWORD_CONFIG_KEY = 'atlas_auth_pw'
ATLAS_BATCH_CHUNK_SIZE_KEY = 'atlas_batch_chunk_size'
ATLAS_TIMEOUT_SECONDS_KEY = 'atlas_timeout_seconds'

ENTITY_TYPE_KEY = 'entity_type'

DEFAULT_QUERY_PARAMS_BY_ENTITY = {
'Table': {
'typeName': 'Table',
'excludeDeletedEntities': True,
'query': '*'
}
}

DEFAULT_CONFIG = ConfigFactory.from_dict({ATLAS_URL_CONFIG_KEY: "localhost",
ATLAS_PORT_CONFIG_KEY: 21000,
ATLAS_PROTOCOL_CONFIG_KEY: 'http',
ATLAS_VALIDATE_SSL_CONFIG_KEY: False,
ATLAS_BATCH_CHUNK_SIZE_KEY: 100,
ATLAS_TIMEOUT_SECONDS_KEY: 60})

# @todo fill out below fields for TableESDocument
# tags: List[str],

# es_document field, atlas field path, modification function, default_value
FIELDS_MAPPING_SPEC: type_fields_mapping_spec = {
'Table': [
('database', 'typeName', None, None),
('cluster', 'attributes.qualifiedName', lambda x: x.split('@')[-1], None),
('schema', 'relationshipAttributes.db.displayText', None, None),
('name', 'attributes.name', None, None),
('key', 'attributes.qualifiedName', None, None),
('description', 'attributes.description', None, None),
('last_updated_timestamp', 'updateTime', lambda x: int(x) / 1000, 0),
('total_usage', 'attributes.popularityScore', lambda x: int(x), 0),
('unique_usage', 'attributes.uniqueUsage', lambda x: int(x), 1),
('column_names', 'relationshipAttributes.columns', lambda x: [c.get('attributes').get('name')
for c in x if
c.get('status').lower() == 'active'], []),
('column_descriptions', 'relationshipAttributes.columns',
lambda x: [c.get('attributes').get('description') for c in x if c.get('status').lower() == 'active'], []),
('tags', 'tags', None, []),
('badges', 'classifications',
lambda x: [c.get('typeName') for c in x if c.get('entityStatus', '').lower() == 'active'], []),
('display_name', 'attributes.qualifiedName', lambda x: x.split('@')[0], None),
('schema_description', 'attributes.parameters.sourceDescription', None, None),
('programmatic_descriptions', 'attributes.parameters', lambda x: [str(s) for s in list(x.values())], None)
]
}

ENTITY_MODEL_BY_TYPE = {
'Table': 'databuilder.models.table_elasticsearch_document.TableESDocument'
}

REQUIRED_RELATIONSHIPS_BY_TYPE = {
'Table': ['columns']
}

def init(self, conf: ConfigTree) -> None:
self.conf = conf.with_fallback(AtlasSearchDataExtractor.DEFAULT_CONFIG)
self.driver = self._get_driver()

self._extract_iter: Optional[Iterator[Any]] = None

@property
def entity_type(self) -> str:
return self.conf.get(AtlasSearchDataExtractor.ENTITY_TYPE_KEY)

@property
def search_query(self) -> Dict:
return AtlasSearchDataExtractor.DEFAULT_QUERY_PARAMS_BY_ENTITY.get(self.entity_type) or {}

@property
def model_class(self) -> Any:
model_class = AtlasSearchDataExtractor.ENTITY_MODEL_BY_TYPE.get(self.entity_type)

if model_class:
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)

return getattr(mod, class_name)

@property
def field_mappings(self) -> type_fields_mapping:
return AtlasSearchDataExtractor.FIELDS_MAPPING_SPEC.get(self.entity_type) or []

def extract(self) -> Any:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()

try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self) -> str:
return 'extractor.atlas_search_data'

def _get_driver(self) -> Any:
return Atlas(host=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_URL_CONFIG_KEY),
port=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PORT_CONFIG_KEY),
username=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_USERNAME_CONFIG_KEY),
password=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PASSWORD_CONFIG_KEY),
protocol=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PROTOCOL_CONFIG_KEY),
validate_ssl=self.conf.get_bool(AtlasSearchDataExtractor.ATLAS_VALIDATE_SSL_CONFIG_KEY),
timeout=self.conf.get_int(AtlasSearchDataExtractor.ATLAS_TIMEOUT_SECONDS_KEY))

def _execute_query(self, params: Dict, relationships: Optional[List[str]] = None) -> Any:

search_result: List[Union[Entity, SearchBasic]] = []
offset = 0
chunk_size = self.conf.get_int(AtlasSearchDataExtractor.ATLAS_BATCH_CHUNK_SIZE_KEY)

while True:
_params = {'offset': str(offset), 'limit': str(chunk_size)}

full_params = deepcopy(params)
full_params.update(**_params)

results = self.driver.search_basic(**full_params)

search_chunk = extract_entities(results)

if relationships:
guids = [table.guid for table in search_chunk]

if guids:
bulk_collection = self.driver.entity_bulk(guid=guids)

for collection in bulk_collection:
search_chunk = list(collection.entities_with_relationships(attributes=relationships))

search_result = search_result + search_chunk

for table in search_chunk:
yield table

if len(search_chunk) == 0:
break

offset = offset + chunk_size

def _get_extract_iter(self) -> Iterator[Any]:
relationships = AtlasSearchDataExtractor.REQUIRED_RELATIONSHIPS_BY_TYPE.get(self.entity_type)

for atlas_entity in self._execute_query(self.search_query, relationships=relationships):
model_dict = dict()

data = atlas_entity.__dict__['_data']

for spec in self.field_mappings:
model_field, atlas_field_path, _transform_spec, default_value = spec

atlas_value = reduce(lambda x, y: x.get(y, dict()), atlas_field_path.split('.'), data) or default_value

transform_spec = _transform_spec or (lambda x: x)

try:
es_entity_value = transform_spec(atlas_value)
model_dict[model_field] = es_entity_value

except Exception:
LOGGER.warning(
f'Error processing entity. model_field: {model_field} | atlas_field_path: {atlas_field_path} ',
exc_info=True)

try:
result = self.model_class(**model_dict)

yield result
except Exception:
LOGGER.warning(f'Error building model object.', exc_info=True)
82 changes: 82 additions & 0 deletions example/scripts/sample_atlas_search_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import uuid

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory

from databuilder.extractor.atlas_search_data_extractor import AtlasSearchDataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

entity_type = 'Table'
extracted_search_data_path = '/tmp/search_data.json'

# atlas config
atlas_url = 'localhost'
atlas_port = 21000
atlas_protocol = 'http'
atlas_verify_ssl = False
atlas_username = 'admin'
atlas_password = 'admin'
atlas_batch_chunk_size = 50

# elastic config
es = Elasticsearch([
{'host': 'localhost'},
])

elasticsearch_client = es
elasticsearch_new_index_key = 'tables-' + str(uuid.uuid4())
elasticsearch_new_index_key_type = 'table'
elasticsearch_index_alias = 'table_search_index'

job_config = ConfigFactory.from_dict({
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_URL_CONFIG_KEY):
atlas_url,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PORT_CONFIG_KEY):
atlas_port,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PROTOCOL_CONFIG_KEY):
atlas_protocol,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_VALIDATE_SSL_CONFIG_KEY):
atlas_verify_ssl,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_USERNAME_CONFIG_KEY):
atlas_username,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PASSWORD_CONFIG_KEY):
atlas_password,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_BATCH_CHUNK_SIZE_KEY):
atlas_batch_chunk_size,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ENTITY_TYPE_KEY):
entity_type,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY):
'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY):
'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias
})

if __name__ == "__main__":
task = DefaultTask(extractor=AtlasSearchDataExtractor(),
transformer=NoopTransformer(),
loader=FSElasticsearchJSONLoader())

job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())

job.launch()
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@ pandas>=0.21.0,<1.2.0

requests==2.23.0,<3.0
responses==0.10.6

pyatlasclient==1.1.1

0 comments on commit 0604f8f

Please sign in to comment.