Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions gcp_variant_transforms/bq_to_vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from gcp_variant_transforms.beam_io import vcf_header_io
from gcp_variant_transforms.beam_io import vcfio
from gcp_variant_transforms.libs import bigquery_util
from gcp_variant_transforms.libs import bigquery_vcf_schema_converter
from gcp_variant_transforms.libs import schema_converter
from gcp_variant_transforms.libs import genomic_region_parser
from gcp_variant_transforms.libs import vcf_file_composer
from gcp_variant_transforms.options import variant_transform_options
Expand Down Expand Up @@ -140,7 +140,7 @@ def _write_vcf_meta_info(input_table,
# type: (str, str, bool) -> None
"""Writes the meta information generated from BigQuery schema."""
header_fields = (
bigquery_vcf_schema_converter.generate_header_fields_from_schema(
schema_converter.generate_header_fields_from_schema(
_get_schema(input_table), allow_incompatible_schema))
write_header_fn = vcf_header_io.WriteVcfHeaderFn(representative_header_file)
write_header_fn.process(header_fields, _VCF_VERSION_LINE)
Expand Down
37 changes: 36 additions & 1 deletion gcp_variant_transforms/libs/bigquery_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import enum
import re
from typing import Tuple # pylint: disable=unused-import
from typing import List, Tuple, Union # pylint: disable=unused-import

from vcf import parser

Expand Down Expand Up @@ -54,6 +54,17 @@ class TableFieldConstants(object):
MODE_REPEATED = 'REPEATED'


class AvroConstants(object):
"""Constants that are relevant to Avro schema."""
TYPE = 'type'
NAME = 'name'
FIELDS = 'fields'
ARRAY = 'array'
ITEMS = 'items'
RECORD = 'record'
NULL = 'null'


class _SupportedTableFieldType(enum.Enum):
"""The supported BigQuery field types.

Expand Down Expand Up @@ -83,6 +94,16 @@ class _SupportedTableFieldType(enum.Enum):
TableFieldConstants.TYPE_BOOLEAN: _VcfHeaderTypeConstants.FLAG
}

# A map to convert from BigQuery types to their equivalent Avro types.
_BIG_QUERY_TYPE_TO_AVRO_TYPE_MAP = {
# This list is not exhaustive but covers all of the types we currently use.
TableFieldConstants.TYPE_INTEGER: 'long',
TableFieldConstants.TYPE_STRING: 'string',
TableFieldConstants.TYPE_FLOAT: 'double',
TableFieldConstants.TYPE_BOOLEAN: 'boolean',
TableFieldConstants.TYPE_RECORD: 'record'
}

# A map to convert from BigQuery types to Python types.
_BIG_QUERY_TYPE_TO_PYTHON_TYPE_MAP = {
TableFieldConstants.TYPE_INTEGER: int,
Expand Down Expand Up @@ -156,3 +177,17 @@ def get_vcf_num_from_bigquery_schema(bigquery_mode, bigquery_type):
def get_supported_bigquery_schema_types():
"""Returns the supported BigQuery field types."""
return [item.value for item in _SupportedTableFieldType]


def get_avro_type_from_bigquery_type_mode(bigquery_type, bigquery_mode):
# type: (str, str) -> Union[str, List[str, str]]
if not bigquery_type in _BIG_QUERY_TYPE_TO_AVRO_TYPE_MAP:
raise ValueError('Unknown Avro equivalent for type {}'.format(
bigquery_type))
avro_type = _BIG_QUERY_TYPE_TO_AVRO_TYPE_MAP[bigquery_type]
if bigquery_mode == TableFieldConstants.MODE_NULLABLE:
# A nullable type in the Avro schema is represented by a Union which is
# equivalent to an array in JSON format.
return [avro_type, AvroConstants.NULL]
else:
return avro_type
2 changes: 1 addition & 1 deletion gcp_variant_transforms/libs/bigquery_vcf_data_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def get_rows(self,

def _get_call_record(
self,
call, # type: VariantCall
call, # type: vcfio.VariantCall
call_record_schema_descriptor,
# type: bigquery_schema_descriptor.SchemaDescriptor
allow_incompatible_records, # type: bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Handles the conversion between BigQuery schema and VCF header."""
"""Handles the conversion between BigQuery/Avro schema and VCF header."""

from __future__ import absolute_import

from collections import OrderedDict
from typing import Any, Dict, Union # pylint: disable=unused-import
import json
import logging
from typing import Dict, Union # pylint: disable=unused-import

from apache_beam.io.gcp.internal.clients import bigquery
from apitools.base.protorpclite import messages # pylint: disable=unused-import

from vcf import parser

from gcp_variant_transforms.beam_io import vcfio
from gcp_variant_transforms.beam_io import vcf_header_io
from gcp_variant_transforms.libs import bigquery_schema_descriptor # pylint: disable=unused-import
from gcp_variant_transforms.libs import bigquery_util
from gcp_variant_transforms.libs import processed_variant # pylint: disable=unused-import
from gcp_variant_transforms.libs import bigquery_sanitizer
from gcp_variant_transforms.libs import vcf_field_conflict_resolver # pylint: disable=unused-import
from gcp_variant_transforms.libs import vcf_reserved_fields
from gcp_variant_transforms.libs.annotation import annotation_parser
from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import
Expand Down Expand Up @@ -184,6 +185,144 @@ def generate_schema_from_header_fields(
return schema


def _convert_repeated_field_to_avro_array(field, fields_list):
# type: (messages.MessageField) -> Dict
"""Converts a repeated field to an Avro Array representation.

For example the return value can be: {"type": "array", "items": "string"}
"""
array_dict = {
bigquery_util.AvroConstants.TYPE: bigquery_util.AvroConstants.ARRAY
}
if field.fields:
array_dict[bigquery_util.AvroConstants.ITEMS] = {
bigquery_util.AvroConstants.TYPE: bigquery_util.AvroConstants.RECORD,
bigquery_util.AvroConstants.NAME: field.name,
bigquery_util.AvroConstants.FIELDS: fields_list
}
else:
array_dict[bigquery_util.AvroConstants.ITEMS] = {
bigquery_util.AvroConstants.NAME: field.name,
bigquery_util.AvroConstants.TYPE:
bigquery_util.get_avro_type_from_bigquery_type_mode(
field.type, field.mode)
}
# All repeated fields are nullable.
return [bigquery_util.AvroConstants.NULL, array_dict]


def _convert_field_to_avro_dict(field):
# type: (messages.MessageField) -> Dict
field_dict = {}
fields_list = []
if field.fields:
fields_list = [
_convert_field_to_avro_dict(child_f) for child_f in field.fields]
if field.mode == bigquery_util.TableFieldConstants.MODE_REPEATED:
# TODO(bashir2): In this case both the name of the array and also individual
# records in the array is f.name. Make sure this is according to Avro
# spec then remove this TODO.
field_dict[bigquery_util.AvroConstants.NAME] = field.name
field_dict[bigquery_util.AvroConstants.TYPE] = (
_convert_repeated_field_to_avro_array(field, fields_list))
else:
field_dict[bigquery_util.AvroConstants.NAME] = field.name
field_dict[bigquery_util.AvroConstants.TYPE] = (
bigquery_util.get_avro_type_from_bigquery_type_mode(
field.type, field.mode))
if field.fields:
field_dict[bigquery_util.AvroConstants.FIELDS] = fields_list
return field_dict


def _convert_schema_to_avro_dict(schema):
# type: (bigquery.TableSchema) -> Dict
fields_dict = {}
# TODO(bashir2): Check if we need `namespace` and `name` at the top level.
fields_dict[bigquery_util.AvroConstants.NAME] = 'TBD'
fields_dict[
bigquery_util.AvroConstants.TYPE] = bigquery_util.AvroConstants.RECORD
fields_dict[bigquery_util.AvroConstants.FIELDS] = [
_convert_field_to_avro_dict(f) for f in schema.fields]
return fields_dict


def convert_table_schema_to_json_avro_schema(schema):
# type: (bigquery.TableSchema) -> str
"""Returns the Avro equivalent of the given `schema` in json format.

For writing to Avro files, the only piece that is different is the schema. In
other words the exact same `Dict` that represents a BigQuery row can be
written to an Avro file if the schema of that file is equivalent to the
BigQuery Table schema. This function generates that equivalent Avro schema.

For details of Avro schema spec, see:
https://avro.apache.org/docs/1.8.2/spec.html

For concrete examples relevant to our BigQuery schema, consider the following
three required fields:

{
"fields": [
{
"type": [ "string", "null"],
"name": "reference_name"
},
{
"type": ["int", "null"],
"name": "start_position"
},
{
"type": ["int", "null"],
"name": "end_position"
},
...
],
"type": "record",
"name": "TBD"
}

Note that the whole schema is represented as a `record` which has several
`fields`. In the above example, only the first three `fields` are shown.
A `NULLABLE` type in BigQuery schema is equivalent to a `type` array where
`null` is one of the members.

`REPEATED` fields, specially `REPEATED` `RECORD` fields, are a little more
complex in Avro schema format. Here is one example for `alternate_bases`:
{
"type": [{
"items": {
"type": "record",
"name": "alternate_bases",
"fields": [
{
"type": ["string", "null"],
"name": "alt"
},
{
"type": ["float", "null"],
"name": "AF"
}
]
},
"type": "array"
}, "null" ],
"name": "alternate_bases"
},

Args:
schema: This is the BigQuery table schema that is generated from input VCFs.
"""
if not isinstance(schema, bigquery.TableSchema):
raise ValueError(
'Expected an instance of bigquery.TableSchema got {}'.format(
type(schema)))
schema_dict = _convert_schema_to_avro_dict(schema)
json_str = json.dumps(schema_dict)
logging.info('The Avro schema is: %s', json_str)
return json_str


def generate_header_fields_from_schema(schema, allow_incompatible_schema=False):
# type: (bigquery.TableSchema, bool) -> vcf_header_io.VcfHeader
"""Returns header fields converted from BigQuery schema.
Expand Down
Loading