-
Notifications
You must be signed in to change notification settings - Fork 951
/
delta_lake_metadata_extractor.py
534 lines (473 loc) · 23.1 KB
/
delta_lake_metadata_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import concurrent.futures
import logging
from collections import namedtuple
from datetime import datetime
from typing import ( # noqa: F401
Any, Dict, Iterator, List, Optional, Tuple, Union,
)
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from pyspark.sql import SparkSession
from pyspark.sql.catalog import Table
from pyspark.sql.types import (
ArrayType, MapType, StructField, StructType,
)
from pyspark.sql.utils import AnalysisException, ParseException
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.table_metadata_constants import PARTITION_BADGE
from databuilder.models.table_last_updated import TableLastUpdated
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata
from databuilder.models.watermark import Watermark
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
# TODO once column tags work properly, consider deprecating this for TableMetadata directly
class ScrapedColumnMetadata(object):
def __init__(self, name: str, data_type: str, description: Optional[str], sort_order: int,
badges: Union[List[str], None] = None):
self.name = name
self.data_type = data_type
self.description = description
self.sort_order = sort_order
self.is_partition = False
self.attributes: Dict[str, str] = {}
self.badges = badges
def set_is_partition(self, is_partition: bool) -> None:
self.is_partition = is_partition
def set_badges(self, badges: Union[List[str], None]) -> None:
self.badges = badges
def __eq__(self, other: object) -> bool:
if not isinstance(other, ScrapedColumnMetadata):
return False
return (self.name == other.name and
self.data_type == other.data_type and
self.description == other.description and
self.sort_order == other.sort_order and
self.is_partition == other.is_partition and
self.attributes == other.attributes and
self.badges == other.badges)
def __repr__(self) -> str:
return f'{self.name}:{self.data_type}'
# TODO consider deprecating this for using TableMetadata directly
class ScrapedTableMetadata(object):
LAST_MODIFIED_KEY = 'lastModified'
DESCRIPTION_KEY = 'description'
TABLE_FORMAT_KEY = 'format'
def __init__(self, schema: str, table: str):
self.schema: str = schema
self.table: str = table
self.table_detail: Optional[Dict] = None
self.view_detail: Optional[Dict] = None
self.is_view: bool = False
self.failed_to_scrape: bool = False
self.columns: Optional[List[ScrapedColumnMetadata]] = None
def set_table_detail(self, table_detail: Dict) -> None:
self.table_detail = table_detail
self.is_view = False
self.failed_to_scrape = False
def set_view_detail(self, view_detail: Dict) -> None:
self.view_detail = view_detail
self.is_view = True
self.failed_to_scrape = False
def get_details(self) -> Optional[Dict]:
if self.is_view:
return self.view_detail
else:
return self.table_detail
def get_full_table_name(self) -> str:
return self.schema + "." + self.table
def set_failed_to_scrape(self) -> None:
self.failed_to_scrape = True
def set_columns(self, column_list: List[ScrapedColumnMetadata]) -> None:
self.columns = column_list
def get_last_modified(self) -> Optional[datetime]:
details = self.get_details()
if details and self.LAST_MODIFIED_KEY in details:
return details[self.LAST_MODIFIED_KEY]
else:
return None
def get_table_description(self) -> Optional[str]:
details = self.get_details()
if details and self.DESCRIPTION_KEY in details:
return details[self.DESCRIPTION_KEY]
else:
return None
def is_delta_table(self) -> bool:
details = self.get_details()
if details and self.TABLE_FORMAT_KEY in details:
return details[self.TABLE_FORMAT_KEY].lower() == 'delta'
else:
return False
def __repr__(self) -> str:
return f'{self.schema}.{self.table}'
class DeltaLakeMetadataExtractor(Extractor):
"""
Extracts Delta Lake Metadata.
This requires a spark session to run that has a hive metastore populated with all of the delta tables
that you are interested in.
By default, the extractor does not extract nested columns. Set the EXTRACT_NESTED_COLUMNS conf to True
if you would like nested columns extracted
"""
# CONFIG KEYS
DATABASE_KEY = "database"
# If you want to exclude specific schemas
EXCLUDE_LIST_SCHEMAS_KEY = "exclude_list"
# If you want to only include specific schemas
SCHEMA_LIST_KEY = "schema_list"
CLUSTER_KEY = "cluster"
# By default, this will only process and emit delta-lake tables, but it can support all hive table types.
DELTA_TABLES_ONLY = "delta_tables_only"
DEFAULT_CONFIG = ConfigFactory.from_dict({DATABASE_KEY: "delta",
EXCLUDE_LIST_SCHEMAS_KEY: [],
SCHEMA_LIST_KEY: [],
DELTA_TABLES_ONLY: True})
PARTITION_COLUMN_TAG = 'is_partition'
# For backwards compatibility, the delta lake extractor does not extract nested columns for indexing
# Set this to true in the conf if you would like nested columns & complex types fully extracted
EXTRACT_NESTED_COLUMNS = "extract_nested_columns"
def init(self, conf: ConfigTree) -> None:
self.conf = conf.with_fallback(DeltaLakeMetadataExtractor.DEFAULT_CONFIG)
self._extract_iter = None # type: Union[None, Iterator]
self._cluster = self.conf.get_string(DeltaLakeMetadataExtractor.CLUSTER_KEY)
self._db = self.conf.get_string(DeltaLakeMetadataExtractor.DATABASE_KEY)
self.exclude_list = self.conf.get_list(DeltaLakeMetadataExtractor.EXCLUDE_LIST_SCHEMAS_KEY)
self.schema_list = self.conf.get_list(DeltaLakeMetadataExtractor.SCHEMA_LIST_KEY)
self.delta_tables_only = self.conf.get_bool(DeltaLakeMetadataExtractor.DELTA_TABLES_ONLY)
self.extract_nested_columns = self.conf.get_bool(DeltaLakeMetadataExtractor.EXTRACT_NESTED_COLUMNS,
default=False)
def set_spark(self, spark: SparkSession) -> None:
self.spark = spark
def extract(self) -> Union[TableMetadata, List[Tuple[Watermark, Watermark]], TableLastUpdated, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self) -> str:
return 'extractor.delta_lake_table_metadata'
def _get_extract_iter(self) -> Iterator[Union[TableMetadata, Watermark, TableLastUpdated,
None]]:
"""
Given either a list of schemas, or a list of exclude schemas,
it will query hive metastore and then access delta log
to get all of the metadata for your delta tables. It will produce:
- table and column metadata (including partition watermarks)
- last updated information
"""
if self.schema_list:
LOGGER.info("working on %s", self.schema_list)
tables = self.get_all_tables(self.schema_list)
else:
LOGGER.info("fetching all schemas")
LOGGER.info("Excluding: %s", self.exclude_list)
schemas = self.get_schemas(self.exclude_list)
LOGGER.info("working on %s", schemas)
tables = self.get_all_tables(schemas)
# TODO add the programmatic information as well?
scraped_tables = self.scrape_all_tables(tables)
for scraped_table in scraped_tables:
if not scraped_table:
continue
if self.delta_tables_only and not scraped_table.is_delta_table():
LOGGER.info("Skipping none delta table %s", scraped_table.table)
continue
else:
yield self.create_table_metadata(scraped_table)
watermarks = self.create_table_watermarks(scraped_table)
if watermarks:
for watermark in watermarks:
yield watermark[0]
yield watermark[1]
last_updated = self.create_table_last_updated(scraped_table)
if last_updated:
yield last_updated
def get_schemas(self, exclude_list: List[str]) -> List[str]:
'''Returns all schemas.'''
schemas = self.spark.catalog.listDatabases()
ret = []
for schema in schemas:
if schema.name not in exclude_list:
ret.append(schema.name)
return ret
def get_all_tables(self, schemas: List[str]) -> List[Table]:
'''Returns all tables.'''
ret = []
for schema in schemas:
ret.extend(self.get_tables_for_schema(schema))
return ret
def get_tables_for_schema(self, schema: str) -> List[Table]:
'''Returns all tables for a specific schema.'''
return self.spark.catalog.listTables(schema)
def scrape_all_tables(self, tables: List[Table]) -> List[Optional[ScrapedTableMetadata]]:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.scrape_table, table) for table in tables]
scraped_tables = [f.result() for f in futures]
return scraped_tables
def scrape_table(self, table: Table) -> Optional[ScrapedTableMetadata]:
'''Takes a table object and creates a scraped table metadata object.'''
met = ScrapedTableMetadata(schema=table.database, table=table.name)
table_name = met.get_full_table_name()
if table.tableType and table.tableType.lower() != 'view':
table_detail = self.scrape_table_detail(table_name)
if table_detail is None:
LOGGER.error("Failed to parse table " + table_name)
met.set_failed_to_scrape()
return None
else:
LOGGER.info("Successfully parsed table " + table_name)
met.set_table_detail(table_detail)
else:
view_detail = self.scrape_view_detail(table_name)
if view_detail is None:
LOGGER.error("Failed to parse view " + table_name)
met.set_failed_to_scrape()
return None
else:
LOGGER.info("Successfully parsed view " + table_name)
met.set_view_detail(view_detail)
columns = self.fetch_columns(met.schema, met.table)
if not columns:
LOGGER.error("Failed to parse columns for " + table_name)
return None
else:
met.set_columns(columns)
return met
def scrape_table_detail(self, table_name: str) -> Optional[Dict]:
try:
table_details_df = self.spark.sql(f"describe detail {table_name}")
table_detail = table_details_df.collect()[0]
return table_detail.asDict()
except Exception as e:
LOGGER.error(e)
return None
def scrape_view_detail(self, view_name: str) -> Optional[Dict]:
# TODO the blanket try catches need to be changed
describeExtendedOutput = []
try:
describeExtendedOutput = self.spark.sql(f"describe extended {view_name}").collect()
except Exception as e:
LOGGER.error(e)
return None
view_detail = {}
startAdding = False
for row in describeExtendedOutput:
row_dict = row.asDict()
if startAdding:
view_detail[row_dict['col_name']] = row_dict['data_type']
if "# Detailed Table" in row_dict['col_name']:
# Then start parsing
startAdding = True
return view_detail
def fetch_columns(self, schema: str, table: str) -> List[ScrapedColumnMetadata]:
'''This fetches delta table columns, which unfortunately
in the general case cannot rely on spark.catalog.listColumns.'''
raw_columns = []
field_dict: Dict[str, Any] = {}
table_name = f"{schema}.{table}"
try:
raw_columns = self.spark.sql(f"describe {table_name}").collect()
for field in self.spark.table(f"{table_name}").schema:
field_dict[field.name] = field
except (AnalysisException, ParseException) as e:
LOGGER.error(e)
return []
parsed_columns: Dict[str, ScrapedColumnMetadata] = {}
partition_cols = False
sort_order = 0
for row in raw_columns:
col_name = row['col_name']
# NOTE: the behavior of describe has changed between spark 2 and spark 3
if col_name == '' or '#' in col_name:
partition_cols = True
continue
if not partition_cols:
# Attempt to extract nested columns if conf value requests it
if self.extract_nested_columns \
and col_name in field_dict \
and self.is_complex_delta_type(field_dict[col_name].dataType):
sort_order = self._iterate_complex_type("", field_dict[col_name], parsed_columns, sort_order)
else:
column = ScrapedColumnMetadata(
name=row['col_name'],
description=row['comment'] if row['comment'] else None,
data_type=row['data_type'],
sort_order=sort_order,
badges=None
)
parsed_columns[row['col_name']] = column
sort_order += 1
else:
if row['data_type'] in parsed_columns:
LOGGER.debug(f"Adding partition column table for {row['data_type']}")
parsed_columns[row['data_type']].set_is_partition(True)
parsed_columns[row['data_type']].set_badges([PARTITION_BADGE])
elif row['col_name'] in parsed_columns:
LOGGER.debug(f"Adding partition column table for {row['col_name']}")
parsed_columns[row['col_name']].set_is_partition(True)
parsed_columns[row['col_name']].set_badges([PARTITION_BADGE])
return list(parsed_columns.values())
def _iterate_complex_type(self,
parent: str,
curr_field: Union[StructType, StructField, ArrayType, MapType],
parsed_columns: Dict,
total_cols: int) -> int:
col_name = parent
if self.is_struct_field(curr_field):
if len(parent) > 0:
col_name = f"{parent}.{curr_field.name}"
else:
col_name = curr_field.name
parsed_columns[col_name] = ScrapedColumnMetadata(
name=col_name,
data_type=curr_field.dataType.simpleString(),
sort_order=total_cols,
description=None,
)
total_cols += 1
if self.is_complex_delta_type(curr_field.dataType):
total_cols = self._iterate_complex_type(col_name, curr_field.dataType, parsed_columns, total_cols)
if self.is_complex_delta_type(curr_field):
if self.is_struct_type(curr_field):
for field in curr_field:
total_cols = self._iterate_complex_type(col_name, field, parsed_columns, total_cols)
elif self.is_array_type(curr_field) and self.is_complex_delta_type(curr_field.elementType):
total_cols = self._iterate_complex_type(col_name, curr_field.elementType, parsed_columns, total_cols)
elif self.is_map_type(curr_field) and self.is_complex_delta_type(curr_field.valueType):
total_cols = self._iterate_complex_type(col_name, curr_field.valueType, parsed_columns, total_cols)
return total_cols
def create_table_metadata(self, table: ScrapedTableMetadata) -> TableMetadata:
'''Creates the amundsen table metadata object from the ScrapedTableMetadata object.'''
amundsen_columns = []
if table.columns:
for column in table.columns:
amundsen_columns.append(
ColumnMetadata(name=column.name,
description=column.description,
col_type=column.data_type,
sort_order=column.sort_order,
badges=column.badges)
)
description = table.get_table_description()
return TableMetadata(self._db,
self._cluster,
table.schema,
table.table,
description,
amundsen_columns,
table.is_view)
def create_table_last_updated(self, table: ScrapedTableMetadata) -> Optional[TableLastUpdated]:
'''Creates the amundsen table last updated metadata object from the ScrapedTableMetadata object.'''
last_modified = table.get_last_modified()
if last_modified:
return TableLastUpdated(table_name=table.table,
last_updated_time_epoch=int(last_modified.timestamp()),
schema=table.schema,
db=self._db,
cluster=self._cluster)
else:
return None
def is_complex_delta_type(self, delta_type: Any) -> bool:
return isinstance(delta_type, StructType) or \
isinstance(delta_type, ArrayType) or \
isinstance(delta_type, MapType)
def is_struct_type(self, delta_type: Any) -> bool:
return isinstance(delta_type, StructType)
def is_struct_field(self, delta_type: Any) -> bool:
return isinstance(delta_type, StructField)
def is_array_type(self, delta_type: Any) -> bool:
return isinstance(delta_type, ArrayType)
def is_map_type(self, delta_type: Any) -> bool:
return isinstance(delta_type, MapType)
def create_table_watermarks(self, table: ScrapedTableMetadata) -> Optional[List[Tuple[Watermark, Watermark]]]: # noqa c901
"""
Creates the watermark objects that reflect the highest and lowest values in the partition columns
"""
def _is_show_partitions_supported(t: ScrapedTableMetadata) -> bool:
try:
self.spark.sql(f'show partitions {t.schema}.{t.table}')
return True
except Exception as e:
# pyspark.sql.utils.AnalysisException: SHOW PARTITIONS is not allowed on a table that is not partitioned
LOGGER.warning(e)
return False
def _fetch_minmax(table: ScrapedTableMetadata, partition_column: str) -> Tuple[str, str]:
LOGGER.info(f'Fetching partition info for {partition_column} in {table.schema}.{table.table}')
min_water = ""
max_water = ""
try:
if is_show_partitions_supported:
LOGGER.info('Using SHOW PARTITION')
min_water = str(
self
.spark
.sql(f'show partitions {table.schema}.{table.table}')
.orderBy(partition_column, ascending=True)
.first()[partition_column])
max_water = str(
self
.spark
.sql(f'show partitions {table.schema}.{table.table}')
.orderBy(partition_column, ascending=False)
.first()[partition_column])
else:
LOGGER.info('Using DESCRIBE EXTENDED')
part_info = (self
.spark
.sql(f'describe extended {table.schema}.{table.table} {partition_column}')
.collect()
)
minmax = {}
for mm in list(filter(lambda x: x['info_name'] in ['min', 'max'], part_info)):
minmax[mm['info_name']] = mm['info_value']
min_water = minmax['min']
max_water = minmax['max']
except Exception as e:
LOGGER.warning(f'Failed fetching partition watermarks: {e}')
return max_water, min_water
if not table.table_detail:
LOGGER.info(f'No table details found in {table}, skipping')
return None
if 'partitionColumns' not in table.table_detail or len(table.table_detail['partitionColumns']) < 1:
LOGGER.info(f'No partitions found in {table}, skipping')
return None
is_show_partitions_supported: bool = _is_show_partitions_supported(table)
if not is_show_partitions_supported:
LOGGER.info('Analyzing table, this can take a while...')
partition_columns = ','.join(table.table_detail['partitionColumns'])
self.spark.sql(
f"analyze table {table.schema}.{table.table} compute statistics for columns {partition_columns}")
# It makes little sense to get watermarks from a string value, with no concept of high and low.
# Just imagine a dataset with a partition by country...
valid_types = ['int', 'float', 'date', 'datetime']
if table.columns:
_table_columns = table.columns
else:
_table_columns = []
columns_with_valid_type = list(map(lambda n: n.name,
filter(lambda d: str(d.data_type).lower() in valid_types, _table_columns)
)
)
r = []
for partition_column in table.table_detail['partitionColumns']:
if partition_column not in columns_with_valid_type:
continue
last, first = _fetch_minmax(table, partition_column)
low = Watermark(
create_time=table.table_detail['createdAt'],
database=self._db,
schema=table.schema,
table_name=table.table,
part_name=f'{partition_column}={first}',
part_type='low_watermark',
cluster=self._cluster)
high = Watermark(
create_time=table.table_detail['createdAt'],
database=self._db,
schema=table.schema,
table_name=table.table,
part_name=f'{partition_column}={last}',
part_type='high_watermark',
cluster=self._cluster)
r.append((high, low))
return r