Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dremio extractor #377

Merged
merged 1 commit into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
177 changes: 177 additions & 0 deletions databuilder/extractor/dremio_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0


from collections import namedtuple
from itertools import groupby
import logging
from typing import Iterator, Union, Dict, Any

from pyhocon import ConfigFactory, ConfigTree
from pyodbc import connect

from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata


TableKey = namedtuple('TableKey', ['schema', 'table_name'])

LOGGER = logging.getLogger(__name__)


class DremioMetadataExtractor(Extractor):
'''
Extracts Dremio table and column metadata from underlying INFORMATION_SCHEMA table

Requirements:
pyodbc & Dremio driver
'''

SQL_STATEMENT = '''
SELECT
nested_1.COLUMN_NAME AS col_name,
CAST(NULL AS VARCHAR) AS col_description,
nested_1.DATA_TYPE AS col_type,
nested_1.ORDINAL_POSITION AS col_sort_order,
nested_1.TABLE_CATALOG AS database,
'{cluster}' AS cluster,
nested_1.TABLE_SCHEMA AS schema,
nested_1.TABLE_NAME AS name,
CAST(NULL AS VARCHAR) AS description,
CASE WHEN nested_0.TABLE_TYPE='VIEW' THEN TRUE ELSE FALSE END AS is_view
FROM (
SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA."TABLES"
) nested_0
RIGHT JOIN (
SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION
FROM INFORMATION_SCHEMA."COLUMNS"
) nested_1 ON nested_0.TABLE_NAME = nested_1.TABLE_NAME
AND nested_0.TABLE_SCHEMA = nested_1.TABLE_SCHEMA
AND nested_0.TABLE_CATALOG = nested_1.TABLE_CATALOG
{where_stmt}
'''

# Config keys
DREMIO_USER_KEY = 'user_key'
DREMIO_PASSWORD_KEY = 'password_key'
DREMIO_HOST_KEY = 'host_key'
DREMIO_PORT_KEY = 'port_key'
DREMIO_DRIVER_KEY = 'driver_key'
DREMIO_CLUSTER_KEY = 'cluster_key'
DREMIO_EXCLUDE_SYS_TABLES_KEY = 'exclude_system_tables'
DREMIO_EXCLUDE_PDS_TABLES_KEY = 'exclude_pds_tables'

# Default values
DEFAULT_AUTH_USER = 'dremio_auth_user'
DEFAULT_AUTH_PW = 'dremio_auth_pw'
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = '31010'
DEFAULT_DRIVER = 'DSN=Dremio Connector'
DEFAULT_CLUSTER_NAME = 'Production'
DEFAULT_EXCLUDE_SYS_TABLES = True
DEFAULT_EXCLUDE_PDS_TABLES = False

# Default config
DEFAULT_CONFIG = ConfigFactory.from_dict({
DREMIO_USER_KEY: DEFAULT_AUTH_USER,
DREMIO_PASSWORD_KEY: DEFAULT_AUTH_PW,
DREMIO_HOST_KEY: DEFAULT_HOST,
DREMIO_PORT_KEY: DEFAULT_PORT,
DREMIO_DRIVER_KEY: DEFAULT_DRIVER,
DREMIO_CLUSTER_KEY: DEFAULT_CLUSTER_NAME,
DREMIO_EXCLUDE_SYS_TABLES_KEY: DEFAULT_EXCLUDE_SYS_TABLES,
DREMIO_EXCLUDE_PDS_TABLES_KEY: DEFAULT_EXCLUDE_PDS_TABLES
})

def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(DremioMetadataExtractor.DEFAULT_CONFIG)

exclude_sys_tables = conf.get_bool(DremioMetadataExtractor.DREMIO_EXCLUDE_SYS_TABLES_KEY)
exclude_pds_tables = conf.get_bool(DremioMetadataExtractor.DREMIO_EXCLUDE_PDS_TABLES_KEY)
if exclude_sys_tables and exclude_pds_tables:
where_stmt = ("WHERE nested_0.TABLE_TYPE != 'SYSTEM_TABLE' AND "
"nested_0.TABLE_TYPE != 'TABLE';")
elif exclude_sys_tables:
where_stmt = "WHERE nested_0.TABLE_TYPE != 'SYSTEM_TABLE';"
elif exclude_pds_tables:
where_stmt = "WHERE nested_0.TABLE_TYPE != 'TABLE';"
else:
where_stmt = ';'

self._cluster = conf.get_string(DremioMetadataExtractor.DREMIO_CLUSTER_KEY)

self._cluster = conf.get_string(DremioMetadataExtractor.DREMIO_CLUSTER_KEY)

self.sql_stmt = DremioMetadataExtractor.SQL_STATEMENT.format(
cluster=self._cluster,
where_stmt=where_stmt
)

LOGGER.info('SQL for Dremio metadata: {}'.format(self.sql_stmt))

self._pyodbc_cursor = connect(
conf.get_string(DremioMetadataExtractor.DREMIO_DRIVER_KEY),
uid=conf.get_string(DremioMetadataExtractor.DREMIO_USER_KEY),
pwd=conf.get_string(DremioMetadataExtractor.DREMIO_PASSWORD_KEY),
host=conf.get_string(DremioMetadataExtractor.DREMIO_HOST_KEY),
port=conf.get_string(DremioMetadataExtractor.DREMIO_PORT_KEY),
autocommit=True).cursor()

self._extract_iter: Union[None, Iterator] = None

def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self) -> str:
return 'extractor.dremio'

def _get_extract_iter(self) -> Iterator[TableMetadata]:
'''
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
'''
for _, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []

for row in group:
last_row = row
columns.append(ColumnMetadata(
row['col_name'],
row['col_description'],
row['col_type'],
row['col_sort_order'])
)

yield TableMetadata(last_row['database'],
last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns,
last_row['is_view'] == 'true')

def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
'''
Provides iterator of result row from SQLAlchemy extractor
:return:
'''

for row in self._pyodbc_cursor.execute(self.sql_stmt):
yield dict(zip([c[0] for c in self._pyodbc_cursor.description], row))

def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
'''
Table key consists of schema and table name
:param row:
:return:
'''
if row:
return TableKey(schema=row['schema'], table_name=row['name'])

return None
161 changes: 161 additions & 0 deletions example/scripts/sample_dremio_data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""

import logging
import os
from pyhocon import ConfigFactory
import uuid
import sys

from databuilder.extractor.dremio_metadata_extractor import DremioMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from elasticsearch.client import Elasticsearch
from databuilder.transformer.base_transformer import NoopTransformer

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)

# set env Dremio values to override defaults
DREMIO_HOST = 'localhost'
DREMIO_USER = 'dremio'
DREMIO_PASSWORD = 'test'

# set env NEO4J_HOST to override localhost
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(os.getenv('NEO4J_HOST', 'localhost'))
NEO4J_USER = 'neo4j'
NEO4J_PASSWORD = 'test'

es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]

es = Elasticsearch([
{'host': es_host if es_host else 'localhost'},
])


def create_sample_dremio_job():

tmp_folder = '/var/tmp/amundsen/{}'.format('tables')
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)

extractor = DremioMetadataExtractor()
loader = FsNeo4jCSVLoader()

task = DefaultTask(extractor=extractor,
loader=loader)

job_config = ConfigFactory.from_dict({
'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_USER_KEY): DREMIO_USER,
'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_PASSWORD_KEY): DREMIO_PASSWORD,
'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_HOST_KEY): DREMIO_HOST,
'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_EXCLUDE_PDS_TABLES_KEY): True,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR): True,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.FORCE_CREATE_DIR): True,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): NEO4J_ENDPOINT,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): NEO4J_USER,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): NEO4J_PASSWORD,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): 'unique_tag'
})

job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())

return job


def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=None):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())

# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())

job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): NEO4J_ENDPOINT,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): model_name,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): NEO4J_USER,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): NEO4J_PASSWORD,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_doc_type_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias,
})

# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put('extractor.search_data.{}'.format(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY),
cypher_query)
if elasticsearch_mapping:
job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY),
elasticsearch_mapping)

job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job


if __name__ == "__main__":
# Push code to Neo4j from Dremio
job = create_sample_dremio_job()
job.launch()

# Push data to Elasticsearch from Neo4j
job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()
8 changes: 7 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,16 @@
'ibm-db-sa-py3==0.3.1-1'
]

dremio = [
'pyodbc==4.0.30'
]

druid = [
'pydruid'
]

all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath + db2 + druid
all_deps = requirements + kafka + cassandra + glue + snowflake + athena + \
bigquery + jsonpath + db2 + dremio + druid

setup(
name='amundsen-databuilder',
Expand All @@ -75,6 +80,7 @@
'bigquery': bigquery,
'jsonpath': jsonpath,
'db2': db2,
'dremio': dremio,
'druid': druid,
},
classifiers=[
Expand Down