Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions gcp_variant_transforms/libs/bigquery_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
"""Constants and simple utility functions related to BigQuery."""

import enum
import exceptions
import re
from typing import List, Tuple, Union # pylint: disable=unused-import

from apache_beam.io.gcp.internal.clients import bigquery
from apitools.base.py import exceptions
from oauth2client.client import GoogleCredentials
from vcf import parser

from gcp_variant_transforms.beam_io import vcf_header_io
Expand Down Expand Up @@ -191,3 +195,88 @@ def get_avro_type_from_bigquery_type_mode(bigquery_type, bigquery_mode):
return [avro_type, AvroConstants.NULL]
else:
return avro_type

def update_bigquery_schema_on_append(schema_fields, output_table):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please add a docstring for this function since it is public now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# type: (List[bigquery.TableFieldSchema], str) -> None
"""Update BQ schema by combining existing one with a new one, if possible.

If table does not exist, do not need to update the schema.
TODO (yifangchen): Move the logic into validate().
"""
output_table_re_match = re.match(
r'^((?P<project>.+):)(?P<dataset>\w+)\.(?P<table>[\w\$]+)$',
output_table)
credentials = GoogleCredentials.get_application_default().create_scoped(
['https://www.googleapis.com/auth/bigquery'])
client = bigquery.BigqueryV2(credentials=credentials)
try:
project_id = output_table_re_match.group('project')
dataset_id = output_table_re_match.group('dataset')
table_id = output_table_re_match.group('table')
existing_table = client.tables.Get(bigquery.BigqueryTablesGetRequest(
projectId=project_id,
datasetId=dataset_id,
tableId=table_id))
except exceptions.HttpError:
return

new_schema = bigquery.TableSchema()
new_schema.fields = _get_merged_field_schemas(existing_table.schema.fields,
schema_fields)
existing_table.schema = new_schema
try:
client.tables.Update(bigquery.BigqueryTablesUpdateRequest(
projectId=project_id,
datasetId=dataset_id,
table=existing_table,
tableId=table_id))
except exceptions.HttpError as e:
raise RuntimeError('BigQuery schema update failed: %s' % str(e))


def _get_merged_field_schemas(
field_schemas_1, # type: List[bigquery.TableFieldSchema]
field_schemas_2 # type: List[bigquery.TableFieldSchema]
):
# type: (...) -> List[bigquery.TableFieldSchema]
"""Merges the `field_schemas_1` and `field_schemas_2`.

Args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Leave one empty line between Args, Returns, and Raises. See one example. I know this is how the old code looks like, can you also help refining them? :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, Done.

field_schemas_1: A list of `TableFieldSchema`.
field_schemas_2: A list of `TableFieldSchema`.

Returns:
A new schema with new fields from `field_schemas_2` appended to
`field_schemas_1`.

Raises:
ValueError: If there are fields with the same name, but different modes or
different types.
"""
existing_fields = {} # type: Dict[str, bigquery.TableFieldSchema]
merged_field_schemas = [] # type: List[bigquery.TableFieldSchema]
for field_schema in field_schemas_1:
existing_fields.update({field_schema.name: field_schema})
merged_field_schemas.append(field_schema)

for field_schema in field_schemas_2:
if field_schema.name not in existing_fields.keys():
merged_field_schemas.append(field_schema)
else:
existing_field_schema = existing_fields.get(field_schema.name)
if field_schema.mode != existing_field_schema.mode:
raise ValueError(
'The mode of field {} is not compatible. The original mode is {}, '
'and the new mode is {}.'.format(field_schema.name,
existing_field_schema.mode,
field_schema.mode))
if field_schema.type != existing_field_schema.type:
raise ValueError(
'The type of field {} is not compatible. The original type is {}, '
'and the new type is {}.'.format(field_schema.name,
existing_field_schema.type,
field_schema.type))
if field_schema.type == TableFieldConstants.TYPE_RECORD:
existing_field_schema.fields = _get_merged_field_schemas(
existing_field_schema.fields, field_schema.fields)
return merged_field_schemas
Loading