-
Notifications
You must be signed in to change notification settings - Fork 951
/
glue_extractor.py
124 lines (107 loc) · 4.21 KB
/
glue_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
from typing import (
Any, Dict, Iterator, List, Union,
)
import boto3
from pyhocon import ConfigFactory, ConfigTree
from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata
class GlueExtractor(Extractor):
"""
Extracts tables and columns metadata from AWS Glue metastore
"""
CLUSTER_KEY = 'cluster'
FILTER_KEY = 'filters'
MAX_RESULTS_KEY = 'max_results'
RESOURCE_SHARE_TYPE = 'resource_share_type'
REGION_NAME_KEY = "region"
PARTITION_BADGE_LABEL_KEY = "partition_badge_label"
DEFAULT_CONFIG = ConfigFactory.from_dict({
CLUSTER_KEY: 'gold',
FILTER_KEY: None,
MAX_RESULTS_KEY: 500,
RESOURCE_SHARE_TYPE: "ALL",
REGION_NAME_KEY: None,
PARTITION_BADGE_LABEL_KEY: None,
})
def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(GlueExtractor.DEFAULT_CONFIG)
self._cluster = conf.get_string(GlueExtractor.CLUSTER_KEY)
self._filters = conf.get(GlueExtractor.FILTER_KEY)
self._max_results = conf.get(GlueExtractor.MAX_RESULTS_KEY)
self._resource_share_type = conf.get(GlueExtractor.RESOURCE_SHARE_TYPE)
self._region_name = conf.get(GlueExtractor.REGION_NAME_KEY)
self._partition_badge_label = conf.get(GlueExtractor.PARTITION_BADGE_LABEL_KEY)
if self._region_name is not None:
self._glue = boto3.client('glue', region_name=self._region_name)
else:
self._glue = boto3.client('glue')
self._extract_iter: Union[None, Iterator] = None
def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self) -> str:
return 'extractor.glue'
def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
It gets all tables and yields TableMetadata
:return:
"""
for row in self._get_raw_extract_iter():
columns, i = [], 0
if 'StorageDescriptor' not in row:
continue
for column in row['StorageDescriptor']['Columns']:
columns.append(ColumnMetadata(
name=column["Name"],
description=column.get("Comment"),
col_type=column["Type"],
sort_order=i,
))
i += 1
for column in row.get('PartitionKeys', []):
columns.append(ColumnMetadata(
name=column["Name"],
description=column.get("Comment"),
col_type=column["Type"],
sort_order=i,
badges=[self._partition_badge_label] if self._partition_badge_label else None,
))
i += 1
yield TableMetadata(
'glue',
self._cluster,
row['DatabaseName'],
row['Name'],
row.get('Description') or row.get('Parameters', {}).get('comment'),
columns,
row.get('TableType') == 'VIRTUAL_VIEW',
)
def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of results row from glue client
:return:
"""
tables = self._search_tables()
return iter(tables)
def _search_tables(self) -> List[Dict[str, Any]]:
tables = []
kwargs = {}
if self._filters is not None:
kwargs['Filters'] = self._filters
kwargs['MaxResults'] = self._max_results
if self._resource_share_type:
kwargs['ResourceShareType'] = self._resource_share_type
data = self._glue.search_tables(**kwargs)
tables += data['TableList']
while 'NextToken' in data:
token = data['NextToken']
kwargs['NextToken'] = token
data = self._glue.search_tables(**kwargs)
tables += data['TableList']
return tables