Skip to content

Commit

Permalink
added batched writes to BQ for tag exports
Browse files Browse the repository at this point in the history
  • Loading branch information
shirleycohen committed Feb 14, 2023
1 parent 74a2f8f commit ee92750
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
26 changes: 9 additions & 17 deletions BigQueryUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,40 +63,32 @@ def truncate_report_tables(self, project, dataset):
else:
return False

# API method used by tag export function
def insert_exported_record(self, target_table_id, project, dataset, table, column, tag_template, tag_field, tag_value):
# API method used by tag export function to insert records
def insert_exported_records(self, target_table_id, records):

print("*** inside BigQueryUtils.insert_exported_record() ***")
print('*** insert_exported_records into', target_table_id)

success = True
current_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " UTC"


if target_table_id.endswith('catalog_report_column_tags'):
schema = self.get_report_column_schema()
rows_to_insert = [
{"project": project, "dataset": dataset, "table": table, "column": column, "tag_template": tag_template, "tag_field": tag_field, "tag_value": tag_value, "export_time": current_ts},
]

rows_to_insert = records

elif target_table_id.endswith('catalog_report_table_tags'):
schema = self.get_report_table_schema()
rows_to_insert = [
{"project": project, "dataset": dataset, "table": table, "tag_template": tag_template, "tag_field": tag_field, "tag_value": tag_value, "export_time": current_ts},
]
rows_to_insert = records

elif target_table_id.endswith('catalog_report_dataset_tags'):
schema = self.get_report_dataset_schema()
rows_to_insert = [
{"project": project, "dataset": dataset, "tag_template": tag_template, "tag_field": tag_field, "tag_value": tag_value, "export_time": current_ts},
]


rows_to_insert = records

job_config = bigquery.LoadJobConfig(schema=schema, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON)
table_ref = bigquery.table.TableReference.from_string(target_table_id)

try:
job = self.client.load_table_from_json(rows_to_insert, table_ref, job_config=job_config)
print('Inserted record into reporting table')
print('job errors:', job.errors)

except Exception as e:

Expand Down
47 changes: 34 additions & 13 deletions DataCatalogUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,10 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe
print('target_region:', target_region)
print('uri:', uri)

column_tag_records = []
table_tag_records = []
dataset_tag_records = []

export_status = constants.SUCCESS
bqu = bq.BigQueryUtils(target_region)

Expand All @@ -1255,10 +1259,10 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe
tagged_dataset = uri.split('/')[2]

if '/tables/' in uri:
target_table_id = target_project + '.' + target_dataset + '.' + 'catalog_report_table_tags'
target_table_id = 'catalog_report_table_tags'
tagged_table = uri.split('/')[4]
else:
target_table_id = target_project + '.' + target_dataset + '.' + 'catalog_report_dataset_tags'
target_table_id = 'catalog_report_dataset_tags'
tagged_table = None

bigquery_resource = '//bigquery.googleapis.com/projects/' + uri
Expand Down Expand Up @@ -1287,11 +1291,12 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe
self.template_path = tag.template
template_fields = self.get_template()

if tag.column and tag.column != '':
if tag.column and len(tag.column) > 1:
tagged_column = tag.column
target_table_id = target_project + '.' + target_dataset + '.' + 'catalog_report_column_tags'
target_table_id = 'catalog_report_column_tags'
else:
tagged_column = None
target_table_id = 'catalog_report_table_tags'

for template_field in template_fields:

Expand All @@ -1302,8 +1307,6 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe
continue

tagged_field = tag.fields[field_id]

print('tagged_field:', tagged_field)
tagged_field_str = str(tagged_field)
tagged_field_split = tagged_field_str.split('\n')
#print('tagged_field_split:', tagged_field_split)
Expand All @@ -1314,7 +1317,7 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe
if '_value:' in split:
start_index = split.index(':', 0) + 1
#print('start_index:', start_index)
field_value = split[start_index:].strip().replace('"', '').replace('<br>', ', ')
field_value = split[start_index:].strip().replace('"', '').replace('<br>', ',')
print('extracted field_value:', field_value)
break
elif 'enum_value' in split:
Expand All @@ -1324,13 +1327,31 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe

split_index += 1

# write record to BQ
success = bqu.insert_exported_record(target_table_id, tagged_project, tagged_dataset, tagged_table, tagged_column, self.template_id, field_id, field_value)
# format record to be written
current_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " UTC"

if success == False:
print('Error occurred while writing to BigQuery report table.')
export_status = constants.ERROR

if target_table_id in 'catalog_report_column_tags':
column_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "table": tagged_table, "column": tagged_column, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts})

elif target_table_id in 'catalog_report_table_tags':
table_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "table": tagged_table, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts})

elif target_table_id in 'catalog_report_dataset_tags':
dataset_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts})

# write exported records to BQ
if len(dataset_tag_records) > 0:
target_table_id = target_project + '.' + target_dataset + '.catalog_report_dataset_tags'
success = bqu.insert_exported_records(target_table_id, dataset_tag_records)

if len(table_tag_records) > 0:
target_table_id = target_project + '.' + target_dataset + '.catalog_report_table_tags'
success = bqu.insert_exported_records(target_table_id, table_tag_records)

if len(column_tag_records) > 0:
target_table_id = target_project + '.' + target_dataset + '.catalog_report_column_tags'
success = bqu.insert_exported_records(target_table_id, column_tag_records)

return export_status


Expand Down

0 comments on commit ee92750

Please sign in to comment.