From 8c633072da81b7fb2e343aa31e6e7f9c3bfae07d Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Sun, 20 Dec 2020 06:59:05 +0100 Subject: [PATCH] feat: atlas_search_extractor | :tada: Initial commit. (#415) Signed-off-by: mgorsk1 --- .../extractor/atlas_search_data_extractor.py | 311 ++++++++++++++++++ .../scripts/sample_atlas_search_extractor.py | 88 +++++ requirements.txt | 2 + setup.py | 8 +- 4 files changed, 408 insertions(+), 1 deletion(-) create mode 100644 databuilder/extractor/atlas_search_data_extractor.py create mode 100644 example/scripts/sample_atlas_search_extractor.py diff --git a/databuilder/extractor/atlas_search_data_extractor.py b/databuilder/extractor/atlas_search_data_extractor.py new file mode 100644 index 000000000..fb2e5c5a1 --- /dev/null +++ b/databuilder/extractor/atlas_search_data_extractor.py @@ -0,0 +1,311 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 + +import importlib +import logging +import multiprocessing.pool +from copy import deepcopy +from functools import reduce +from typing import ( + Any, Dict, Generator, Iterator, List, Optional, Tuple, +) + +from atlasclient.client import Atlas +from pyhocon import ConfigFactory, ConfigTree + +from databuilder.extractor.base_extractor import Extractor + +LOGGER = logging.getLogger(__name__) + +# custom types +type_fields_mapping_spec = Dict[str, List[Tuple[str, str, Any, Any]]] +type_fields_mapping = List[Tuple[str, str, Any, Any]] + +# @todo document classes/methods +# @todo write tests + +__all__ = ['AtlasSearchDataExtractor'] + + +class AtlasSearchDataExtractorHelpers: + @staticmethod + def _filter_none(input_list: List) -> List: + return list(filter(None, input_list)) + + @staticmethod + def get_column_names(column_list: List) -> List: + return AtlasSearchDataExtractorHelpers._filter_none( + [c.get('attributes').get('name') for c in column_list if c.get('status').lower() == 'active']) + + @staticmethod + def get_column_descriptions(column_list: List) -> List: + return AtlasSearchDataExtractorHelpers._filter_none( + [c.get('attributes').get('description') for c in column_list if c.get('status').lower() == 'active']) + + @staticmethod + def get_badges_from_classifications(classifications: List) -> List: + return AtlasSearchDataExtractorHelpers._filter_none( + [c.get('typeName') for c in classifications if c.get('entityStatus', '').lower() == 'active']) + + +class AtlasSearchDataExtractor(Extractor): + ATLAS_URL_CONFIG_KEY = 'atlas_url' + ATLAS_PORT_CONFIG_KEY = 'atlas_port' + ATLAS_PROTOCOL_CONFIG_KEY = 'atlas_protocol' + ATLAS_VALIDATE_SSL_CONFIG_KEY = 'atlas_validate_ssl' + ATLAS_USERNAME_CONFIG_KEY = 'atlas_auth_user' + ATLAS_PASSWORD_CONFIG_KEY = 'atlas_auth_pw' + ATLAS_SEARCH_CHUNK_SIZE_KEY = 'atlas_search_chunk_size' + ATLAS_DETAILS_CHUNK_SIZE_KEY = 'atlas_details_chunk_size' + ATLAS_TIMEOUT_SECONDS_KEY = 'atlas_timeout_seconds' + ATLAS_MAX_RETRIES_KEY = 'atlas_max_retries' + + PROCESS_POOL_SIZE_KEY = 'process_pool_size' + + ENTITY_TYPE_KEY = 'entity_type' + + DEFAULT_CONFIG = ConfigFactory.from_dict({ATLAS_URL_CONFIG_KEY: "localhost", + ATLAS_PORT_CONFIG_KEY: 21000, + ATLAS_PROTOCOL_CONFIG_KEY: 'http', + ATLAS_VALIDATE_SSL_CONFIG_KEY: False, + ATLAS_SEARCH_CHUNK_SIZE_KEY: 250, + ATLAS_DETAILS_CHUNK_SIZE_KEY: 25, + ATLAS_TIMEOUT_SECONDS_KEY: 120, + ATLAS_MAX_RETRIES_KEY: 2, + PROCESS_POOL_SIZE_KEY: 10}) + + # @todo fill out below fields for TableESDocument + # tags: List[str], + + # es_document field, atlas field path, modification function, default_value + FIELDS_MAPPING_SPEC: type_fields_mapping_spec = { + 'Table': [ + ('database', 'typeName', None, None), + ('cluster', 'attributes.qualifiedName', lambda x: x.split('@')[-1], None), + ('schema', 'relationshipAttributes.db.displayText', None, None), + ('name', 'attributes.name', None, None), + ('key', 'attributes.qualifiedName', None, None), + ('description', 'attributes.description', None, None), + ('last_updated_timestamp', 'updateTime', lambda x: int(x) / 1000, 0), + ('total_usage', 'attributes.popularityScore', lambda x: int(x), 0), + ('unique_usage', 'attributes.uniqueUsage', lambda x: int(x), 1), + ('column_names', 'relationshipAttributes.columns', + lambda x: AtlasSearchDataExtractorHelpers.get_column_names(x), []), + ('column_descriptions', 'relationshipAttributes.columns', + lambda x: AtlasSearchDataExtractorHelpers.get_column_descriptions(x), []), + ('tags', 'tags', None, []), + ('badges', 'classifications', + lambda x: AtlasSearchDataExtractorHelpers.get_badges_from_classifications(x), []), + ('display_name', 'attributes.qualifiedName', lambda x: x.split('@')[0], None), + ('schema_description', 'attributes.parameters.sourceDescription', None, None), + ('programmatic_descriptions', 'attributes.parameters', lambda x: [str(s) for s in list(x.values())], {}) + ] + } + + ENTITY_MODEL_BY_TYPE = { + 'Table': 'databuilder.models.table_elasticsearch_document.TableESDocument' + } + + REQUIRED_RELATIONSHIPS_BY_TYPE = { + 'Table': ['columns'] + } + + def init(self, conf: ConfigTree) -> None: + self.conf = conf.with_fallback(AtlasSearchDataExtractor.DEFAULT_CONFIG) + self.driver = self._get_driver() + + self._extract_iter: Optional[Iterator[Any]] = None + + @property + def entity_type(self) -> str: + return self.conf.get(AtlasSearchDataExtractor.ENTITY_TYPE_KEY) + + @property + def basic_search_query(self) -> Dict: + query = { + 'typeName': self.entity_type, + 'excludeDeletedEntities': True, + 'query': '*' + } + + LOGGER.debug(f'Basic Search Query: {query}') + + return query + + @property + def dsl_search_query(self) -> Dict: + query = { + 'query': f'{self.entity_type} where __state = "ACTIVE"' + } + + LOGGER.debug(f'DSL Search Query: {query}') + + return query + + @property + def model_class(self) -> Any: + model_class = AtlasSearchDataExtractor.ENTITY_MODEL_BY_TYPE.get(self.entity_type) + + if model_class: + module_name, class_name = model_class.rsplit(".", 1) + mod = importlib.import_module(module_name) + + return getattr(mod, class_name) + + @property + def field_mappings(self) -> type_fields_mapping: + return AtlasSearchDataExtractor.FIELDS_MAPPING_SPEC.get(self.entity_type) or [] + + @property + def search_chunk_size(self) -> int: + return self.conf.get_int(AtlasSearchDataExtractor.ATLAS_SEARCH_CHUNK_SIZE_KEY) + + @property + def relationships(self) -> Optional[List[str]]: + return AtlasSearchDataExtractor.REQUIRED_RELATIONSHIPS_BY_TYPE.get(self.entity_type) + + def extract(self) -> Any: + if not self._extract_iter: + self._extract_iter = self._get_extract_iter() + + try: + return next(self._extract_iter) + except StopIteration: + return None + + def get_scope(self) -> str: + return 'extractor.atlas_search_data' + + def _get_driver(self) -> Any: + return Atlas(host=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_URL_CONFIG_KEY), + port=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PORT_CONFIG_KEY), + username=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_USERNAME_CONFIG_KEY), + password=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PASSWORD_CONFIG_KEY), + protocol=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PROTOCOL_CONFIG_KEY), + validate_ssl=self.conf.get_bool(AtlasSearchDataExtractor.ATLAS_VALIDATE_SSL_CONFIG_KEY), + timeout=self.conf.get_int(AtlasSearchDataExtractor.ATLAS_TIMEOUT_SECONDS_KEY), + max_retries=self.conf.get_int(AtlasSearchDataExtractor.ATLAS_MAX_RETRIES_KEY)) + + def _get_approximate_count_of_entities(self) -> int: + try: + # Fetch the table entities based on query terms + count_query = deepcopy(self.basic_search_query) + + minimal_parameters = { + 'includeClassificationAttributes': False, + 'includeSubClassifications': False + } + + count_query.update(minimal_parameters) + + search_results = self.driver.search_basic.create(data=count_query) + + count = search_results._data.get("approximateCount") + except Exception as e: + count = 0 + + return count + + def _get_entity_guids(self, start_offset: int) -> List[str]: + result = [] + + batch_start = start_offset + batch_end = start_offset + self.search_chunk_size + + LOGGER.info(f'Collecting guids for batch: {batch_start}-{batch_end}') + + _params = {'offset': str(batch_start), 'limit': str(self.search_chunk_size)} + + full_params = deepcopy(self.dsl_search_query) + full_params.update(**_params) + + try: + results = self.driver.search_dsl(**full_params) + + for hit in results: + for entity in hit.entities: + result.append(entity.guid) + + return result + except Exception: + LOGGER.warning(f'Error processing batch: {batch_start}-{batch_end}', exc_info=True) + + return [] + + def _get_entity_details(self, guid_list: List[str]) -> List: + result = [] + + LOGGER.info(f'Processing guids chunk of size: {len(guid_list)}') + + try: + bulk_collection = self.driver.entity_bulk(guid=guid_list) + + for collection in bulk_collection: + search_chunk = list(collection.entities_with_relationships(attributes=self.relationships)) + + result += search_chunk + + return result + except Exception: + return [] + + @staticmethod + def split_list_to_chunks(input_list: List[Any], n: int) -> Generator: + """Yield successive n-sized chunks from lst.""" + for i in range(0, len(input_list), n): + yield input_list[i:i + n] + + def _execute_query(self) -> Any: + details_chunk_size = self.conf.get_int(AtlasSearchDataExtractor.ATLAS_DETAILS_CHUNK_SIZE_KEY) + process_pool_size = self.conf.get_int(AtlasSearchDataExtractor.PROCESS_POOL_SIZE_KEY) + + guids = [] + + approximate_count = self._get_approximate_count_of_entities() + + LOGGER.info(f'Received count: {approximate_count}') + + if approximate_count > 0: + offsets = [i * self.search_chunk_size for i in range(int(approximate_count / self.search_chunk_size) + 1)] + else: + offsets = [] + + with multiprocessing.pool.ThreadPool(processes=process_pool_size) as pool: + guid_list = pool.map(self._get_entity_guids, offsets, chunksize=1) + + for sub_list in guid_list: + guids += sub_list + + LOGGER.info(f'Received guids: {len(guids)}') + + if guids: + guids_chunks = AtlasSearchDataExtractor.split_list_to_chunks(guids, details_chunk_size) + + with multiprocessing.pool.ThreadPool(processes=process_pool_size) as pool: + return_list = pool.map(self._get_entity_details, guids_chunks) + + for sub_list in return_list: + for entry in sub_list: + yield entry + + def _get_extract_iter(self) -> Iterator[Any]: + for atlas_entity in self._execute_query(): + model_dict = dict() + + try: + data = atlas_entity.__dict__['_data'] + + for spec in self.field_mappings: + model_field, atlas_field_path, _transform_spec, default_value = spec + + atlas_value = reduce(lambda x, y: x.get(y, dict()), atlas_field_path.split('.'), + data) or default_value + + transform_spec = _transform_spec or (lambda x: x) + + es_entity_value = transform_spec(atlas_value) + model_dict[model_field] = es_entity_value + + yield self.model_class(**model_dict) + except Exception: + LOGGER.warning(f'Error building model object.', exc_info=True) diff --git a/example/scripts/sample_atlas_search_extractor.py b/example/scripts/sample_atlas_search_extractor.py new file mode 100644 index 000000000..79c511aac --- /dev/null +++ b/example/scripts/sample_atlas_search_extractor.py @@ -0,0 +1,88 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 + +import uuid + +from elasticsearch import Elasticsearch +from pyhocon import ConfigFactory + +from databuilder.extractor.atlas_search_data_extractor import AtlasSearchDataExtractor +from databuilder.job.job import DefaultJob +from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader +from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher +from databuilder.task.task import DefaultTask +from databuilder.transformer.base_transformer import NoopTransformer + +entity_type = 'Table' +extracted_search_data_path = '/tmp/search_data.json' +process_pool_size = 5 + +# atlas config +atlas_url = 'localhost' +atlas_port = 21000 +atlas_protocol = 'http' +atlas_verify_ssl = False +atlas_username = 'admin' +atlas_password = 'admin' +atlas_search_chunk_size = 200 +atlas_details_chunk_size = 10 + +# elastic config +es = Elasticsearch([ + {'host': 'localhost'}, +]) + +elasticsearch_client = es +elasticsearch_new_index_key = 'tables-' + str(uuid.uuid4()) +elasticsearch_new_index_key_type = 'table' +elasticsearch_index_alias = 'table_search_index' + +job_config = ConfigFactory.from_dict({ + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_URL_CONFIG_KEY): + atlas_url, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PORT_CONFIG_KEY): + atlas_port, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PROTOCOL_CONFIG_KEY): + atlas_protocol, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_VALIDATE_SSL_CONFIG_KEY): + atlas_verify_ssl, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_USERNAME_CONFIG_KEY): + atlas_username, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PASSWORD_CONFIG_KEY): + atlas_password, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_SEARCH_CHUNK_SIZE_KEY): + atlas_search_chunk_size, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_DETAILS_CHUNK_SIZE_KEY): + atlas_details_chunk_size, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.PROCESS_POOL_SIZE_KEY): + process_pool_size, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ENTITY_TYPE_KEY): + entity_type, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): + extracted_search_data_path, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): + 'w', + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): + extracted_search_data_path, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): + 'r', + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): + elasticsearch_client, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): + elasticsearch_new_index_key, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY): + elasticsearch_new_index_key_type, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): + elasticsearch_index_alias +}) + +if __name__ == "__main__": + task = DefaultTask(extractor=AtlasSearchDataExtractor(), + transformer=NoopTransformer(), + loader=FSElasticsearchJSONLoader()) + + job = DefaultJob(conf=job_config, + task=task, + publisher=ElasticsearchPublisher()) + + job.launch() diff --git a/requirements.txt b/requirements.txt index 22a175f23..249532e87 100644 --- a/requirements.txt +++ b/requirements.txt @@ -62,3 +62,5 @@ pandas>=0.21.0,<1.2.0 requests==2.23.0,<3.0 responses==0.10.6 + +pyatlasclient==1.1.2 \ No newline at end of file diff --git a/setup.py b/setup.py index 63f20e516..b7bde3eb8 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,10 @@ # Copyright Contributors to the Amundsen project. # SPDX-License-Identifier: Apache-2.0 + from setuptools import find_packages, setup -__version__ = '4.0.3' +__version__ = '4.0.4' requirements = [ @@ -64,6 +65,10 @@ 'feast==0.8.0' ] +atlas = [ + 'pyatlasclient==1.1.2' +] + all_deps = requirements + kafka + cassandra + glue + snowflake + athena + \ bigquery + jsonpath + db2 + dremio + druid + spark + feast @@ -92,6 +97,7 @@ 'druid': druid, 'delta': spark, 'feast': feast, + 'atlas': atlas }, classifiers=[ 'Programming Language :: Python :: 3.6',