-
Notifications
You must be signed in to change notification settings - Fork 951
/
dremio_metadata_extractor.py
178 lines (148 loc) · 6.31 KB
/
dremio_metadata_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
from collections import namedtuple
from itertools import groupby
from typing import (
Any, Dict, Iterator, Union,
)
from pyhocon import ConfigFactory, ConfigTree
from pyodbc import connect
from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
class DremioMetadataExtractor(Extractor):
'''
Extracts Dremio table and column metadata from underlying INFORMATION_SCHEMA table
Requirements:
pyodbc & Dremio driver
'''
SQL_STATEMENT = '''
SELECT
nested_1.COLUMN_NAME AS col_name,
CAST(NULL AS VARCHAR) AS col_description,
nested_1.DATA_TYPE AS col_type,
nested_1.ORDINAL_POSITION AS col_sort_order,
nested_1.TABLE_CATALOG AS database,
'{cluster}' AS cluster,
nested_1.TABLE_SCHEMA AS schema,
nested_1.TABLE_NAME AS name,
CAST(NULL AS VARCHAR) AS description,
CASE WHEN nested_0.TABLE_TYPE='VIEW' THEN TRUE ELSE FALSE END AS is_view
FROM (
SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA."TABLES"
) nested_0
RIGHT JOIN (
SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION
FROM INFORMATION_SCHEMA."COLUMNS"
) nested_1 ON nested_0.TABLE_NAME = nested_1.TABLE_NAME
AND nested_0.TABLE_SCHEMA = nested_1.TABLE_SCHEMA
AND nested_0.TABLE_CATALOG = nested_1.TABLE_CATALOG
{where_stmt}
'''
# Config keys
DREMIO_USER_KEY = 'user_key'
DREMIO_PASSWORD_KEY = 'password_key'
DREMIO_HOST_KEY = 'host_key'
DREMIO_PORT_KEY = 'port_key'
DREMIO_DRIVER_KEY = 'driver_key'
DREMIO_CLUSTER_KEY = 'cluster_key'
DREMIO_EXCLUDE_SYS_TABLES_KEY = 'exclude_system_tables'
DREMIO_EXCLUDE_PDS_TABLES_KEY = 'exclude_pds_tables'
# Default values
DEFAULT_AUTH_USER = 'dremio_auth_user'
DEFAULT_AUTH_PW = 'dremio_auth_pw'
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = '31010'
DEFAULT_DRIVER = 'DSN=Dremio Connector'
DEFAULT_CLUSTER_NAME = 'Production'
DEFAULT_EXCLUDE_SYS_TABLES = True
DEFAULT_EXCLUDE_PDS_TABLES = False
# Default config
DEFAULT_CONFIG = ConfigFactory.from_dict({
DREMIO_USER_KEY: DEFAULT_AUTH_USER,
DREMIO_PASSWORD_KEY: DEFAULT_AUTH_PW,
DREMIO_HOST_KEY: DEFAULT_HOST,
DREMIO_PORT_KEY: DEFAULT_PORT,
DREMIO_DRIVER_KEY: DEFAULT_DRIVER,
DREMIO_CLUSTER_KEY: DEFAULT_CLUSTER_NAME,
DREMIO_EXCLUDE_SYS_TABLES_KEY: DEFAULT_EXCLUDE_SYS_TABLES,
DREMIO_EXCLUDE_PDS_TABLES_KEY: DEFAULT_EXCLUDE_PDS_TABLES
})
def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(DremioMetadataExtractor.DEFAULT_CONFIG)
exclude_sys_tables = conf.get_bool(DremioMetadataExtractor.DREMIO_EXCLUDE_SYS_TABLES_KEY)
exclude_pds_tables = conf.get_bool(DremioMetadataExtractor.DREMIO_EXCLUDE_PDS_TABLES_KEY)
if exclude_sys_tables and exclude_pds_tables:
where_stmt = ("WHERE nested_0.TABLE_TYPE != 'SYSTEM_TABLE' AND "
"nested_0.TABLE_TYPE != 'TABLE';")
elif exclude_sys_tables:
where_stmt = "WHERE nested_0.TABLE_TYPE != 'SYSTEM_TABLE';"
elif exclude_pds_tables:
where_stmt = "WHERE nested_0.TABLE_TYPE != 'TABLE';"
else:
where_stmt = ';'
self._cluster = conf.get_string(DremioMetadataExtractor.DREMIO_CLUSTER_KEY)
self._cluster = conf.get_string(DremioMetadataExtractor.DREMIO_CLUSTER_KEY)
self.sql_stmt = DremioMetadataExtractor.SQL_STATEMENT.format(
cluster=self._cluster,
where_stmt=where_stmt
)
LOGGER.info('SQL for Dremio metadata: %s', self.sql_stmt)
self._pyodbc_cursor = connect(
conf.get_string(DremioMetadataExtractor.DREMIO_DRIVER_KEY),
uid=conf.get_string(DremioMetadataExtractor.DREMIO_USER_KEY),
pwd=conf.get_string(DremioMetadataExtractor.DREMIO_PASSWORD_KEY),
host=conf.get_string(DremioMetadataExtractor.DREMIO_HOST_KEY),
port=conf.get_string(DremioMetadataExtractor.DREMIO_PORT_KEY),
autocommit=True).cursor()
self._extract_iter: Union[None, Iterator] = None
def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self) -> str:
return 'extractor.dremio'
def _get_extract_iter(self) -> Iterator[TableMetadata]:
'''
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
'''
for _, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(
row['col_name'],
row['col_description'],
row['col_type'],
row['col_sort_order'])
)
yield TableMetadata(last_row['database'],
last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns,
last_row['is_view'] == 'true')
def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
'''
Provides iterator of result row from SQLAlchemy extractor
:return:
'''
for row in self._pyodbc_cursor.execute(self.sql_stmt):
yield dict(zip([c[0] for c in self._pyodbc_cursor.description], row))
def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
'''
Table key consists of schema and table name
:param row:
:return:
'''
if row:
return TableKey(schema=row['schema'], table_name=row['name'])
return None