diff --git a/BigQueryUtils.py b/BigQueryUtils.py index 7251e63..9741316 100644 --- a/BigQueryUtils.py +++ b/BigQueryUtils.py @@ -63,40 +63,32 @@ def truncate_report_tables(self, project, dataset): else: return False - # API method used by tag export function - def insert_exported_record(self, target_table_id, project, dataset, table, column, tag_template, tag_field, tag_value): + # API method used by tag export function to insert records + def insert_exported_records(self, target_table_id, records): - print("*** inside BigQueryUtils.insert_exported_record() ***") + print('*** insert_exported_records into', target_table_id) success = True - current_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " UTC" - if target_table_id.endswith('catalog_report_column_tags'): schema = self.get_report_column_schema() - rows_to_insert = [ - {"project": project, "dataset": dataset, "table": table, "column": column, "tag_template": tag_template, "tag_field": tag_field, "tag_value": tag_value, "export_time": current_ts}, - ] - + rows_to_insert = records + elif target_table_id.endswith('catalog_report_table_tags'): schema = self.get_report_table_schema() - rows_to_insert = [ - {"project": project, "dataset": dataset, "table": table, "tag_template": tag_template, "tag_field": tag_field, "tag_value": tag_value, "export_time": current_ts}, - ] + rows_to_insert = records elif target_table_id.endswith('catalog_report_dataset_tags'): schema = self.get_report_dataset_schema() - rows_to_insert = [ - {"project": project, "dataset": dataset, "tag_template": tag_template, "tag_field": tag_field, "tag_value": tag_value, "export_time": current_ts}, - ] - - + rows_to_insert = records + job_config = bigquery.LoadJobConfig(schema=schema, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON) table_ref = bigquery.table.TableReference.from_string(target_table_id) try: job = self.client.load_table_from_json(rows_to_insert, table_ref, job_config=job_config) print('Inserted record into reporting table') + print('job errors:', job.errors) except Exception as e: diff --git a/DataCatalogUtils.py b/DataCatalogUtils.py index 9e07244..fdc138d 100644 --- a/DataCatalogUtils.py +++ b/DataCatalogUtils.py @@ -1243,6 +1243,10 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe print('target_region:', target_region) print('uri:', uri) + column_tag_records = [] + table_tag_records = [] + dataset_tag_records = [] + export_status = constants.SUCCESS bqu = bq.BigQueryUtils(target_region) @@ -1255,10 +1259,10 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe tagged_dataset = uri.split('/')[2] if '/tables/' in uri: - target_table_id = target_project + '.' + target_dataset + '.' + 'catalog_report_table_tags' + target_table_id = 'catalog_report_table_tags' tagged_table = uri.split('/')[4] else: - target_table_id = target_project + '.' + target_dataset + '.' + 'catalog_report_dataset_tags' + target_table_id = 'catalog_report_dataset_tags' tagged_table = None bigquery_resource = '//bigquery.googleapis.com/projects/' + uri @@ -1287,11 +1291,12 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe self.template_path = tag.template template_fields = self.get_template() - if tag.column and tag.column != '': + if tag.column and len(tag.column) > 1: tagged_column = tag.column - target_table_id = target_project + '.' + target_dataset + '.' + 'catalog_report_column_tags' + target_table_id = 'catalog_report_column_tags' else: tagged_column = None + target_table_id = 'catalog_report_table_tags' for template_field in template_fields: @@ -1302,8 +1307,6 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe continue tagged_field = tag.fields[field_id] - - print('tagged_field:', tagged_field) tagged_field_str = str(tagged_field) tagged_field_split = tagged_field_str.split('\n') #print('tagged_field_split:', tagged_field_split) @@ -1314,7 +1317,7 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe if '_value:' in split: start_index = split.index(':', 0) + 1 #print('start_index:', start_index) - field_value = split[start_index:].strip().replace('"', '').replace('
', ', ') + field_value = split[start_index:].strip().replace('"', '').replace('
', ',') print('extracted field_value:', field_value) break elif 'enum_value' in split: @@ -1324,13 +1327,31 @@ def apply_export_config(self, config_uuid, target_project, target_dataset, targe split_index += 1 - # write record to BQ - success = bqu.insert_exported_record(target_table_id, tagged_project, tagged_dataset, tagged_table, tagged_column, self.template_id, field_id, field_value) + # format record to be written + current_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " UTC" - if success == False: - print('Error occurred while writing to BigQuery report table.') - export_status = constants.ERROR - + if target_table_id in 'catalog_report_column_tags': + column_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "table": tagged_table, "column": tagged_column, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts}) + + elif target_table_id in 'catalog_report_table_tags': + table_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "table": tagged_table, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts}) + + elif target_table_id in 'catalog_report_dataset_tags': + dataset_tag_records.append({"project": tagged_project, "dataset": tagged_dataset, "tag_template": self.template_id, "tag_field": field_id, "tag_value": field_value, "export_time": current_ts}) + + # write exported records to BQ + if len(dataset_tag_records) > 0: + target_table_id = target_project + '.' + target_dataset + '.catalog_report_dataset_tags' + success = bqu.insert_exported_records(target_table_id, dataset_tag_records) + + if len(table_tag_records) > 0: + target_table_id = target_project + '.' + target_dataset + '.catalog_report_table_tags' + success = bqu.insert_exported_records(target_table_id, table_tag_records) + + if len(column_tag_records) > 0: + target_table_id = target_project + '.' + target_dataset + '.catalog_report_column_tags' + success = bqu.insert_exported_records(target_table_id, column_tag_records) + return export_status