From 935be7751a6fcda3030b6b980173df52485a68d0 Mon Sep 17 00:00:00 2001 From: Nima Mousavi <4221028+nmousavi@users.noreply.github.com> Date: Thu, 8 Mar 2018 14:27:03 -0500 Subject: [PATCH 1/5] Define class SchemaDescriptor. Tested: unit test --- .../libs/schema_descriptor.py | 69 ++++++++++ .../libs/schema_descriptor_test.py | 126 ++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 gcp_variant_transforms/libs/schema_descriptor.py create mode 100644 gcp_variant_transforms/libs/schema_descriptor_test.py diff --git a/gcp_variant_transforms/libs/schema_descriptor.py b/gcp_variant_transforms/libs/schema_descriptor.py new file mode 100644 index 000000000..3604de0dc --- /dev/null +++ b/gcp_variant_transforms/libs/schema_descriptor.py @@ -0,0 +1,69 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A dict based description for BigQuery's schema.""" + +from __future__ import absolute_import + +from collections import namedtuple +from apache_beam.io.gcp.internal.clients import bigquery #pylint: disable=unused-import + +__all__ = ['SchemaDescriptor'] + +#Stores data about a simple field (not a record) in BigQuery Schema. +FieldDescriptor = namedtuple('FieldDescriptor', ['type', 'mode']) + +class SchemaDescriptor(object): + """A dict based description for :class:`bigquery.TableSchema` object.""" + + def __init__(self, table_schema): + # type: (bigquery.TableSchema) -> None + + # Dict of (field_name, :class:`FieldDescriptor`). + self.field_descriptor_dict = {} + # Dict of (record_name, :class:`SchemaDescriptor`). + self.schema_descriptor_dict = {} + + self._extract_all_descriptors(table_schema) + + def _extract_all_descriptors(self, table_schema): + # type: (bigquery.TableSchema) -> None + """Extracts descriptor for fields and records in `table_schema`.""" + + for field in table_schema.fields: + if field.fields: + # Record field. + self.schema_descriptor_dict[field.name] = SchemaDescriptor(field) + else: + # Simple field. + self.field_descriptor_dict[field.name] = FieldDescriptor( + type=field.type, mode=field.mode) + + def get_field_descriptor(self, field_name): + # type: (str) -> FieldDescriptor + + if field_name in self.field_descriptor_dict: + return self.field_descriptor_dict[field_name] + else: + raise ValueError("Field descriptor not found. Not such field in Bigquery " + "schema: {}".format(field_name)) + + def get_record_schema_descriptor(self, record_name): + # type: (str) -> SchemaDescriptor + + if record_name in self.schema_descriptor_dict: + return self.schema_descriptor_dict[record_name] + else: + raise ValueError("Schema descriptor not found. Not such record " + "in Bigquery schema: {}".format(record_name)) diff --git a/gcp_variant_transforms/libs/schema_descriptor_test.py b/gcp_variant_transforms/libs/schema_descriptor_test.py new file mode 100644 index 000000000..7466a2d1a --- /dev/null +++ b/gcp_variant_transforms/libs/schema_descriptor_test.py @@ -0,0 +1,126 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for infer_variant_header module.""" + + +from __future__ import absolute_import + + +import unittest + + +from apache_beam.io.gcp.internal.clients import bigquery +from gcp_variant_transforms.libs import bigquery_util +from gcp_variant_transforms.libs import schema_descriptor + + +class SchemaDescriptorTest(unittest.TestCase): + """Test case for :class:`SchemaDescriptor`""" + + def setUp(self): + self._boolean = bigquery_util.TableFieldConstants.TYPE_BOOLEAN + self._float = bigquery_util.TableFieldConstants.TYPE_FLOAT + self._integer = bigquery_util.TableFieldConstants.TYPE_INTEGER + self._record = bigquery_util.TableFieldConstants.TYPE_RECORD + self._string = bigquery_util.TableFieldConstants.TYPE_STRING + + self._nullable = bigquery_util.TableFieldConstants.MODE_NULLABLE + self._repeated = bigquery_util.TableFieldConstants.MODE_REPEATED + + def _get_table_schema(self): + # type (None) -> bigquery.TableSchema + schema = bigquery.TableSchema() + schema.fields.append(bigquery.TableFieldSchema( + name='field_1', type=self._string, mode=self._nullable, + description='foo desc')) + schema.fields.append(bigquery.TableFieldSchema( + name='field_2', type=self._integer, mode=self._repeated, + description='foo desc')) + # Record field. + record_field = bigquery.TableFieldSchema( + name='record_1', type=self._record, mode=self._repeated, + description='foo desc') + record_field.fields.append(bigquery.TableFieldSchema( + name='record_1-field_1', type=self._boolean, mode=self._nullable, + description='foo desc')) + record_field.fields.append(bigquery.TableFieldSchema( + name='record_1-field_2', type=self._float, mode=self._repeated, + description='foo desc')) + # Record field, two level deep. + deep_record_field = bigquery.TableFieldSchema( + name='record_1-record_2', type=self._record, mode=self._repeated, + description='foo desc') + deep_record_field.fields.append(bigquery.TableFieldSchema( + name='record_1-record_2-field_1', type=self._boolean, + mode=self._nullable, description='foo desc')) + + record_field.fields.append(deep_record_field) + schema.fields.append(record_field) + return schema + + def _get_schema_descriptor(self): + return schema_descriptor.SchemaDescriptor(self._get_table_schema()) + + def test_non_existence_field(self): + schema = self._get_schema_descriptor() + with self.assertRaises(ValueError): + schema.get_field_descriptor('non_existence_field') + self.fail('Non existence field should throw an exceprion') + + def test_non_existence_record(self): + schema = self._get_schema_descriptor() + with self.assertRaises(ValueError): + schema.get_record_schema_descriptor('non_existence_record') + self.fail('Non existence field should throw an exceprion') + + + def test_field_descriptor_at_first_level(self): + print self._get_table_schema() + schema = self._get_schema_descriptor() + + self.assertEqual( + schema.get_field_descriptor('field_1'), + schema_descriptor.FieldDescriptor( + type=self._string, mode=self._nullable)) + self.assertEqual( + schema.get_field_descriptor('field_2'), + schema_descriptor.FieldDescriptor( + type=self._integer, mode=self._repeated)) + + def test_field_descriptor_at_second_level(self): + main_schema = self._get_schema_descriptor() + record_schema = main_schema.get_record_schema_descriptor('record_1') + + self.assertEqual( + record_schema.get_field_descriptor('record_1-field_1'), + schema_descriptor.FieldDescriptor( + type=self._boolean, mode=self._nullable)) + self.assertEqual( + record_schema.get_field_descriptor('record_1-field_2'), + schema_descriptor.FieldDescriptor( + type=self._float, mode=self._repeated)) + + def test_field_descriptor_at_third_level(self): + main_schema = self._get_schema_descriptor() + parent_record_schema = main_schema.get_record_schema_descriptor( + 'record_1') + child_record_schema = parent_record_schema.get_record_schema_descriptor( + 'record_1-record_2') + + self.assertEqual( + child_record_schema.get_field_descriptor( + 'record_1-record_2-field_1'), + schema_descriptor.FieldDescriptor( + type=self._boolean, mode=self._nullable)) From d7c785182078c638bdd229b6f2a72969a283ad55 Mon Sep 17 00:00:00 2001 From: Nima Mousavi <4221028+nmousavi@users.noreply.github.com> Date: Thu, 8 Mar 2018 14:29:57 -0500 Subject: [PATCH 2/5] Define SchemaDescriptor class. Provide serialization and lookup API for BQ schema fields. Will be used in bigquery_vcf_schema.py. --- .../libs/schema_descriptor.py | 69 ---------- .../libs/schema_descriptor_test.py | 126 ------------------ 2 files changed, 195 deletions(-) delete mode 100644 gcp_variant_transforms/libs/schema_descriptor.py delete mode 100644 gcp_variant_transforms/libs/schema_descriptor_test.py diff --git a/gcp_variant_transforms/libs/schema_descriptor.py b/gcp_variant_transforms/libs/schema_descriptor.py deleted file mode 100644 index 3604de0dc..000000000 --- a/gcp_variant_transforms/libs/schema_descriptor.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright 2018 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""A dict based description for BigQuery's schema.""" - -from __future__ import absolute_import - -from collections import namedtuple -from apache_beam.io.gcp.internal.clients import bigquery #pylint: disable=unused-import - -__all__ = ['SchemaDescriptor'] - -#Stores data about a simple field (not a record) in BigQuery Schema. -FieldDescriptor = namedtuple('FieldDescriptor', ['type', 'mode']) - -class SchemaDescriptor(object): - """A dict based description for :class:`bigquery.TableSchema` object.""" - - def __init__(self, table_schema): - # type: (bigquery.TableSchema) -> None - - # Dict of (field_name, :class:`FieldDescriptor`). - self.field_descriptor_dict = {} - # Dict of (record_name, :class:`SchemaDescriptor`). - self.schema_descriptor_dict = {} - - self._extract_all_descriptors(table_schema) - - def _extract_all_descriptors(self, table_schema): - # type: (bigquery.TableSchema) -> None - """Extracts descriptor for fields and records in `table_schema`.""" - - for field in table_schema.fields: - if field.fields: - # Record field. - self.schema_descriptor_dict[field.name] = SchemaDescriptor(field) - else: - # Simple field. - self.field_descriptor_dict[field.name] = FieldDescriptor( - type=field.type, mode=field.mode) - - def get_field_descriptor(self, field_name): - # type: (str) -> FieldDescriptor - - if field_name in self.field_descriptor_dict: - return self.field_descriptor_dict[field_name] - else: - raise ValueError("Field descriptor not found. Not such field in Bigquery " - "schema: {}".format(field_name)) - - def get_record_schema_descriptor(self, record_name): - # type: (str) -> SchemaDescriptor - - if record_name in self.schema_descriptor_dict: - return self.schema_descriptor_dict[record_name] - else: - raise ValueError("Schema descriptor not found. Not such record " - "in Bigquery schema: {}".format(record_name)) diff --git a/gcp_variant_transforms/libs/schema_descriptor_test.py b/gcp_variant_transforms/libs/schema_descriptor_test.py deleted file mode 100644 index 7466a2d1a..000000000 --- a/gcp_variant_transforms/libs/schema_descriptor_test.py +++ /dev/null @@ -1,126 +0,0 @@ -# Copyright 2018 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Tests for infer_variant_header module.""" - - -from __future__ import absolute_import - - -import unittest - - -from apache_beam.io.gcp.internal.clients import bigquery -from gcp_variant_transforms.libs import bigquery_util -from gcp_variant_transforms.libs import schema_descriptor - - -class SchemaDescriptorTest(unittest.TestCase): - """Test case for :class:`SchemaDescriptor`""" - - def setUp(self): - self._boolean = bigquery_util.TableFieldConstants.TYPE_BOOLEAN - self._float = bigquery_util.TableFieldConstants.TYPE_FLOAT - self._integer = bigquery_util.TableFieldConstants.TYPE_INTEGER - self._record = bigquery_util.TableFieldConstants.TYPE_RECORD - self._string = bigquery_util.TableFieldConstants.TYPE_STRING - - self._nullable = bigquery_util.TableFieldConstants.MODE_NULLABLE - self._repeated = bigquery_util.TableFieldConstants.MODE_REPEATED - - def _get_table_schema(self): - # type (None) -> bigquery.TableSchema - schema = bigquery.TableSchema() - schema.fields.append(bigquery.TableFieldSchema( - name='field_1', type=self._string, mode=self._nullable, - description='foo desc')) - schema.fields.append(bigquery.TableFieldSchema( - name='field_2', type=self._integer, mode=self._repeated, - description='foo desc')) - # Record field. - record_field = bigquery.TableFieldSchema( - name='record_1', type=self._record, mode=self._repeated, - description='foo desc') - record_field.fields.append(bigquery.TableFieldSchema( - name='record_1-field_1', type=self._boolean, mode=self._nullable, - description='foo desc')) - record_field.fields.append(bigquery.TableFieldSchema( - name='record_1-field_2', type=self._float, mode=self._repeated, - description='foo desc')) - # Record field, two level deep. - deep_record_field = bigquery.TableFieldSchema( - name='record_1-record_2', type=self._record, mode=self._repeated, - description='foo desc') - deep_record_field.fields.append(bigquery.TableFieldSchema( - name='record_1-record_2-field_1', type=self._boolean, - mode=self._nullable, description='foo desc')) - - record_field.fields.append(deep_record_field) - schema.fields.append(record_field) - return schema - - def _get_schema_descriptor(self): - return schema_descriptor.SchemaDescriptor(self._get_table_schema()) - - def test_non_existence_field(self): - schema = self._get_schema_descriptor() - with self.assertRaises(ValueError): - schema.get_field_descriptor('non_existence_field') - self.fail('Non existence field should throw an exceprion') - - def test_non_existence_record(self): - schema = self._get_schema_descriptor() - with self.assertRaises(ValueError): - schema.get_record_schema_descriptor('non_existence_record') - self.fail('Non existence field should throw an exceprion') - - - def test_field_descriptor_at_first_level(self): - print self._get_table_schema() - schema = self._get_schema_descriptor() - - self.assertEqual( - schema.get_field_descriptor('field_1'), - schema_descriptor.FieldDescriptor( - type=self._string, mode=self._nullable)) - self.assertEqual( - schema.get_field_descriptor('field_2'), - schema_descriptor.FieldDescriptor( - type=self._integer, mode=self._repeated)) - - def test_field_descriptor_at_second_level(self): - main_schema = self._get_schema_descriptor() - record_schema = main_schema.get_record_schema_descriptor('record_1') - - self.assertEqual( - record_schema.get_field_descriptor('record_1-field_1'), - schema_descriptor.FieldDescriptor( - type=self._boolean, mode=self._nullable)) - self.assertEqual( - record_schema.get_field_descriptor('record_1-field_2'), - schema_descriptor.FieldDescriptor( - type=self._float, mode=self._repeated)) - - def test_field_descriptor_at_third_level(self): - main_schema = self._get_schema_descriptor() - parent_record_schema = main_schema.get_record_schema_descriptor( - 'record_1') - child_record_schema = parent_record_schema.get_record_schema_descriptor( - 'record_1-record_2') - - self.assertEqual( - child_record_schema.get_field_descriptor( - 'record_1-record_2-field_1'), - schema_descriptor.FieldDescriptor( - type=self._boolean, mode=self._nullable)) From 57f5408deaf820656203ae0bf4154730aee03abd Mon Sep 17 00:00:00 2001 From: Nima Mousavi <4221028+nmousavi@users.noreply.github.com> Date: Thu, 8 Mar 2018 14:39:42 -0500 Subject: [PATCH 3/5] Define SchemaDescriptor class. Provides serialization and lookup API for type/mode of schema fields. Tested: unit test --- .../libs/bigquery_schema_descriptor.py | 69 ++++++++++ .../libs/bigquery_schema_descriptor_test.py | 127 ++++++++++++++++++ 2 files changed, 196 insertions(+) create mode 100644 gcp_variant_transforms/libs/bigquery_schema_descriptor.py create mode 100644 gcp_variant_transforms/libs/bigquery_schema_descriptor_test.py diff --git a/gcp_variant_transforms/libs/bigquery_schema_descriptor.py b/gcp_variant_transforms/libs/bigquery_schema_descriptor.py new file mode 100644 index 000000000..3604de0dc --- /dev/null +++ b/gcp_variant_transforms/libs/bigquery_schema_descriptor.py @@ -0,0 +1,69 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A dict based description for BigQuery's schema.""" + +from __future__ import absolute_import + +from collections import namedtuple +from apache_beam.io.gcp.internal.clients import bigquery #pylint: disable=unused-import + +__all__ = ['SchemaDescriptor'] + +#Stores data about a simple field (not a record) in BigQuery Schema. +FieldDescriptor = namedtuple('FieldDescriptor', ['type', 'mode']) + +class SchemaDescriptor(object): + """A dict based description for :class:`bigquery.TableSchema` object.""" + + def __init__(self, table_schema): + # type: (bigquery.TableSchema) -> None + + # Dict of (field_name, :class:`FieldDescriptor`). + self.field_descriptor_dict = {} + # Dict of (record_name, :class:`SchemaDescriptor`). + self.schema_descriptor_dict = {} + + self._extract_all_descriptors(table_schema) + + def _extract_all_descriptors(self, table_schema): + # type: (bigquery.TableSchema) -> None + """Extracts descriptor for fields and records in `table_schema`.""" + + for field in table_schema.fields: + if field.fields: + # Record field. + self.schema_descriptor_dict[field.name] = SchemaDescriptor(field) + else: + # Simple field. + self.field_descriptor_dict[field.name] = FieldDescriptor( + type=field.type, mode=field.mode) + + def get_field_descriptor(self, field_name): + # type: (str) -> FieldDescriptor + + if field_name in self.field_descriptor_dict: + return self.field_descriptor_dict[field_name] + else: + raise ValueError("Field descriptor not found. Not such field in Bigquery " + "schema: {}".format(field_name)) + + def get_record_schema_descriptor(self, record_name): + # type: (str) -> SchemaDescriptor + + if record_name in self.schema_descriptor_dict: + return self.schema_descriptor_dict[record_name] + else: + raise ValueError("Schema descriptor not found. Not such record " + "in Bigquery schema: {}".format(record_name)) diff --git a/gcp_variant_transforms/libs/bigquery_schema_descriptor_test.py b/gcp_variant_transforms/libs/bigquery_schema_descriptor_test.py new file mode 100644 index 000000000..2c6f0ebd1 --- /dev/null +++ b/gcp_variant_transforms/libs/bigquery_schema_descriptor_test.py @@ -0,0 +1,127 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for infer_variant_header module.""" + + +from __future__ import absolute_import + + +import unittest + + +from apache_beam.io.gcp.internal.clients import bigquery +from gcp_variant_transforms.libs import bigquery_util +from gcp_variant_transforms.libs import bigquery_schema_descriptor + + +class SchemaDescriptorTest(unittest.TestCase): + """Test case for :class:`SchemaDescriptor`""" + + def setUp(self): + self._boolean = bigquery_util.TableFieldConstants.TYPE_BOOLEAN + self._float = bigquery_util.TableFieldConstants.TYPE_FLOAT + self._integer = bigquery_util.TableFieldConstants.TYPE_INTEGER + self._record = bigquery_util.TableFieldConstants.TYPE_RECORD + self._string = bigquery_util.TableFieldConstants.TYPE_STRING + + self._nullable = bigquery_util.TableFieldConstants.MODE_NULLABLE + self._repeated = bigquery_util.TableFieldConstants.MODE_REPEATED + + def _get_table_schema(self): + # type (None) -> bigquery.TableSchema + schema = bigquery.TableSchema() + schema.fields.append(bigquery.TableFieldSchema( + name='field_1', type=self._string, mode=self._nullable, + description='foo desc')) + schema.fields.append(bigquery.TableFieldSchema( + name='field_2', type=self._integer, mode=self._repeated, + description='foo desc')) + # Record field. + record_field = bigquery.TableFieldSchema( + name='record_1', type=self._record, mode=self._repeated, + description='foo desc') + record_field.fields.append(bigquery.TableFieldSchema( + name='record_1-field_1', type=self._boolean, mode=self._nullable, + description='foo desc')) + record_field.fields.append(bigquery.TableFieldSchema( + name='record_1-field_2', type=self._float, mode=self._repeated, + description='foo desc')) + # Record field, two level deep. + deep_record_field = bigquery.TableFieldSchema( + name='record_1-record_2', type=self._record, mode=self._repeated, + description='foo desc') + deep_record_field.fields.append(bigquery.TableFieldSchema( + name='record_1-record_2-field_1', type=self._boolean, + mode=self._nullable, description='foo desc')) + + record_field.fields.append(deep_record_field) + schema.fields.append(record_field) + return schema + + def _get_schema_descriptor(self): + return bigquery_schema_descriptor.SchemaDescriptor( + self._get_table_schema()) + + def test_non_existence_field(self): + schema = self._get_schema_descriptor() + with self.assertRaises(ValueError): + schema.get_field_descriptor('non_existence_field') + self.fail('Non existence field should throw an exceprion') + + def test_non_existence_record(self): + schema = self._get_schema_descriptor() + with self.assertRaises(ValueError): + schema.get_record_schema_descriptor('non_existence_record') + self.fail('Non existence field should throw an exceprion') + + + def test_field_descriptor_at_first_level(self): + print self._get_table_schema() + schema = self._get_schema_descriptor() + + self.assertEqual( + schema.get_field_descriptor('field_1'), + bigquery_schema_descriptor.FieldDescriptor( + type=self._string, mode=self._nullable)) + self.assertEqual( + schema.get_field_descriptor('field_2'), + bigquery_schema_descriptor.FieldDescriptor( + type=self._integer, mode=self._repeated)) + + def test_field_descriptor_at_second_level(self): + main_schema = self._get_schema_descriptor() + record_schema = main_schema.get_record_schema_descriptor('record_1') + + self.assertEqual( + record_schema.get_field_descriptor('record_1-field_1'), + bigquery_schema_descriptor.FieldDescriptor( + type=self._boolean, mode=self._nullable)) + self.assertEqual( + record_schema.get_field_descriptor('record_1-field_2'), + bigquery_schema_descriptor.FieldDescriptor( + type=self._float, mode=self._repeated)) + + def test_field_descriptor_at_third_level(self): + main_schema = self._get_schema_descriptor() + parent_record_schema = main_schema.get_record_schema_descriptor( + 'record_1') + child_record_schema = parent_record_schema.get_record_schema_descriptor( + 'record_1-record_2') + + self.assertEqual( + child_record_schema.get_field_descriptor( + 'record_1-record_2-field_1'), + bigquery_schema_descriptor.FieldDescriptor( + type=self._boolean, mode=self._nullable)) From 5d15a61cedc5341841ea13e9835c61712a19c676 Mon Sep 17 00:00:00 2001 From: allieychen <36679499+allieychen@users.noreply.github.com> Date: Thu, 1 Mar 2018 14:37:54 -0500 Subject: [PATCH 4/5] Multiple queries for integration test (#121) * Add multiple query support to integration tests #120. update all tests in integration small_tests to support multiple query. update the validate_table in run_tests, add a loop for all test cases in the test file. changed the required keys for the .json file. Remove "validation_query" and "expected_query_result", and add "test_cases". Ran ./deploy_and_run_tests.sh and all integration tests passed. Update the development guide doc (#124) Update the development guide doc. Add IntelliJ IDE setup. Add more details. Added an INFO message for the full command. Tested: Ran manually and checked the new log message. Uses the macros to replace the common queries. (#127) Define NUM_ROWS, SUM_START, SUM_END in QueryFormatter, and replaces them in the query to avoid duplicate code. TESTED: deploy_and_run_tests. Define SchemaDescriptor class. Provides serialization and lookup API for type/mode of schema fields. Tested: unit test --- docs/development_guide.md | 89 ++++++++++++- .../libs/bigquery_schema_descriptor.py | 69 ++++++++++ .../libs/bigquery_schema_descriptor_test.py | 124 ++++++++++++++++++ .../testing/integration/run_tests.py | 92 ++++++++++--- .../integration/small_tests/valid_4_0.json | 25 ++-- .../small_tests/valid_4_0_bz2.json | 25 ++-- .../integration/small_tests/valid_4_0_gz.json | 25 ++-- .../integration/small_tests/valid_4_1.json | 25 ++-- .../integration/small_tests/valid_4_1_gz.json | 25 ++-- .../integration/small_tests/valid_4_2.json | 25 ++-- .../small_tests/valid_4_2_VEP.json | 30 +++-- .../integration/small_tests/valid_4_2_gz.json | 25 ++-- gcp_variant_transforms/vcf_to_bq.py | 2 + 13 files changed, 478 insertions(+), 103 deletions(-) create mode 100644 gcp_variant_transforms/libs/bigquery_schema_descriptor.py create mode 100644 gcp_variant_transforms/libs/bigquery_schema_descriptor_test.py diff --git a/docs/development_guide.md b/docs/development_guide.md index 826c9cbe5..eeb0a64cc 100644 --- a/docs/development_guide.md +++ b/docs/development_guide.md @@ -15,15 +15,20 @@ for more information on using pull requests. ## Setup ### Fork the repository on Github + Visit the [gcp-variant-transforms repository](https://github.com/googlegenomics/gcp-variant-transforms) to create your own fork of the repository. See [https://guides.github.com/activities/forking/](https://guides.github.com/activities/forking/) -for more information. +for more information. Do not create branches on the main repository. Meanwhile, +do not commit anything to the master of the forked repository to keep the +syncing process simple. + ### Setup dev environment #### Clone the forked repository + ```bash git clone git@github.com:/gcp-variant-transforms.git cd gcp_variant_transforms @@ -35,6 +40,7 @@ git remote add upstream git@github.com:googlegenomics/gcp-variant-transforms.git ``` #### Setup virtualenv + ```bash sudo apt-get install python-pip python-dev build-essential sudo pip install --upgrade pip @@ -44,6 +50,7 @@ virtualenv venv ``` #### Install dependences + ```bash pip install --upgrade . ``` @@ -51,7 +58,62 @@ Note that after running the above command we get some dependency conflicts in installed packages which is currently safe to ignore. For details see [Issue #71](https://github.com/googlegenomics/gcp-variant-transforms/issues/71). +### Setup IDE + +You may choose any IDE as you like. The following steps are intended for +IntelliJ users. + +#### Install IntelliJ IDE + +Download +[IntelliJ IDEA Community Edition](https://www.jetbrains.com/idea/download/#section=linux) +and install. + +#### Install Python plugin + +Choose File | Settings on the main menu, and then go to Plugins. +Click the Install JetBrains plugin button. +In the dialog that opens, search for Python Community Edition and then install +the plugin. + +For more details, refer to +[Install plugins](https://www.jetbrains.com/help/idea/installing-updating-and-uninstalling-repository-plugins.html). + +#### Create Project + +Choose File | New | Project on the main menu, and create a new Python project +in the dialog that opens. To setup the Project SDK, follow the following steps. +1. Click the New button, then add Local. +2. In the dialog that opens, click the Virtual Environment node. Select New +environment, and specify the location of the new virtual environment. Note that +the folder where the new virtual environment should be located must be empty! +For the Base interpreter, add the python path [PATH_TO_VENV]/bin/python under +the virtualenv directory created in "Setup virtualenv" above. + +Then go to Next, navigate the Project location to the local git project +directory created in "Clone the forked repository" step. Click Finish. + +#### Code Inspection + +The inspection profile in .idea/inspectionProfiles/Project_Default.xml is +checked into the git repository and can be imported into +File | Settings | Editor | Inspections. + +Code inspections can be run from the Analyze menu. To speed up the inspection +process, you can go to File | Project Structure | Modules and only set the +gcp_variant_transforms as the Sources. You may exclude other folders, or specify +the inspection scope to be only Module 'gcp-variant-transforms' when running +the inspection. The result window can be accessed from View > Tool Windows. + +#### Code Style + +To comply with pylint coding style, you may change the default line length in +File | Settings | Editor | Code Style. Set the hard wrap at 80 columns and +check Wrap on typing. Further, go to Python in the dropdown list, you can +set the indent to 2 and continuation indent to 4. + ## Making Changes + ### Create a branch in your forked repository Running this command will create a branch named `` and switch @@ -62,6 +124,7 @@ git checkout -b origin/master ``` ### Testing + To run all unit tests: ```bash @@ -88,6 +151,16 @@ For other projects you can use the `--project` and `--gs_dir` options of the script. ### Pushing changes to your fork's branch + +Before pushing changes, make sure the pylint checks pass. To install pylint: +```bash +source [PATH_TO_VENV]/bin/activate +pip install --upgrade pylint +``` +Then run: +```bash +pylint --rcfile=.pylintrc gcp_variant_transforms/ +``` To push changes to your forked branch, you can run: ```bash git add -p @@ -101,6 +174,7 @@ git push -u origin To commit and push those changes to your branch. ### Syncing your branch + If you want to pull in changes from the target branch (i.e. googlegenomic:master), run: ```bash @@ -117,8 +191,12 @@ to your branch. If this happens, you can force push after a rebase by runnning: ```bash git push -f ``` +For more information, you may check on +[merge](https://git-scm.com/book/en/v2/Git-Branching-Basic-Branching-and-Merging#_basic_merging) +and [rebase](https://git-scm.com/book/en/v2/Git-Branching-Rebasing). ### Creating a pull request + Once your changes are pushed and ready for review, you can create a pull request by visiting the [gcp-variant-transforms repository](https://github.com/googlegenomics/gcp-variant-transforms) @@ -131,8 +209,12 @@ description of how you have tested your change. As a minimum you should have unit-test coverage for your change and make sure integration tests pass. ### Updating changes -After making changes, you must again add, commit, and push those changes. Rather -than creating new commits for related changes please run the following: + +After making changes, you must again add, commit, and push those changes. It is +preferred to have one commit per review round such that your reviewers can +easily check what you have changed since last time. To create new commits, you +may follow the steps stated in "Pushing changes to your fork's branch". +Otherwise, please run the following: ```bash git add -p @@ -159,6 +241,7 @@ This approach is specially useful if you tend to do a lot of small commits during your feature development and like to keep them as checkpoints. ### Continuous integration + Once your pull request is approved and merged into the main repo, there is an automated process to create a new docker image from this commit, push it to the [Container Registry]( diff --git a/gcp_variant_transforms/libs/bigquery_schema_descriptor.py b/gcp_variant_transforms/libs/bigquery_schema_descriptor.py new file mode 100644 index 000000000..198d3a2da --- /dev/null +++ b/gcp_variant_transforms/libs/bigquery_schema_descriptor.py @@ -0,0 +1,69 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""A dict based description for BigQuery's schema.""" + +from __future__ import absolute_import + +from collections import namedtuple +from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=unused-import + +__all__ = ['SchemaDescriptor'] + +# Stores data about a simple field (not a record) in BigQuery Schema. +FieldDescriptor = namedtuple('FieldDescriptor', ['type', 'mode']) + + +class SchemaDescriptor(object): + """A dict based description for :class:`bigquery.TableSchema` object.""" + + def __init__(self, table_schema): + # type: (bigquery.TableSchema) -> None + + # Dict of (field_name, :class:`FieldDescriptor`). + self.field_descriptor_dict = {} + # Dict of (record_name, :class:`SchemaDescriptor`). + self.schema_descriptor_dict = {} + + self._extract_all_descriptors(table_schema) + + def _extract_all_descriptors(self, table_schema): + # type: (bigquery.TableSchema) -> None + """Extracts descriptor for fields and records in `table_schema`.""" + + for field in table_schema.fields: + if field.fields: + # Record field. + self.schema_descriptor_dict[field.name] = SchemaDescriptor(field) + else: + # Simple field. + self.field_descriptor_dict[field.name] = FieldDescriptor( + type=field.type, mode=field.mode) + + def get_field_descriptor(self, field_name): + # type: (str) -> FieldDescriptor + + if field_name in self.field_descriptor_dict: + return self.field_descriptor_dict[field_name] + else: + raise ValueError('Field descriptor not found. Not such field in Bigquery ' + 'schema: {}'.format(field_name)) + + def get_record_schema_descriptor(self, record_name): + # type: (str) -> SchemaDescriptor + + if record_name in self.schema_descriptor_dict: + return self.schema_descriptor_dict[record_name] + else: + raise ValueError('Schema descriptor not found. Not such record ' + 'in Bigquery schema: {}'.format(record_name)) diff --git a/gcp_variant_transforms/libs/bigquery_schema_descriptor_test.py b/gcp_variant_transforms/libs/bigquery_schema_descriptor_test.py new file mode 100644 index 000000000..565b5ae07 --- /dev/null +++ b/gcp_variant_transforms/libs/bigquery_schema_descriptor_test.py @@ -0,0 +1,124 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for infer_variant_header module.""" + + +from __future__ import absolute_import + +import unittest + +from apache_beam.io.gcp.internal.clients import bigquery +from gcp_variant_transforms.libs import bigquery_schema_descriptor +from gcp_variant_transforms.libs import bigquery_util + + +class SchemaDescriptorTest(unittest.TestCase): + """Test case for :class:`SchemaDescriptor`.""" + + def setUp(self): + self._boolean = bigquery_util.TableFieldConstants.TYPE_BOOLEAN + self._float = bigquery_util.TableFieldConstants.TYPE_FLOAT + self._integer = bigquery_util.TableFieldConstants.TYPE_INTEGER + self._record = bigquery_util.TableFieldConstants.TYPE_RECORD + self._string = bigquery_util.TableFieldConstants.TYPE_STRING + + self._nullable = bigquery_util.TableFieldConstants.MODE_NULLABLE + self._repeated = bigquery_util.TableFieldConstants.MODE_REPEATED + + def _get_table_schema(self): + # type (None) -> bigquery.TableSchema + schema = bigquery.TableSchema() + schema.fields.append(bigquery.TableFieldSchema( + name='field_1', type=self._string, mode=self._nullable, + description='foo desc')) + schema.fields.append(bigquery.TableFieldSchema( + name='field_2', type=self._integer, mode=self._repeated, + description='foo desc')) + # Record field. + record_field = bigquery.TableFieldSchema( + name='record_1', type=self._record, mode=self._repeated, + description='foo desc') + record_field.fields.append(bigquery.TableFieldSchema( + name='record_1-field_1', type=self._boolean, mode=self._nullable, + description='foo desc')) + record_field.fields.append(bigquery.TableFieldSchema( + name='record_1-field_2', type=self._float, mode=self._repeated, + description='foo desc')) + # Record field, two level deep. + deep_record_field = bigquery.TableFieldSchema( + name='record_1-record_2', type=self._record, mode=self._repeated, + description='foo desc') + deep_record_field.fields.append(bigquery.TableFieldSchema( + name='record_1-record_2-field_1', type=self._boolean, + mode=self._nullable, description='foo desc')) + + record_field.fields.append(deep_record_field) + schema.fields.append(record_field) + return schema + + def _get_schema_descriptor(self): + return bigquery_schema_descriptor.SchemaDescriptor( + self._get_table_schema()) + + def test_non_existence_field(self): + schema = self._get_schema_descriptor() + with self.assertRaises(ValueError): + schema.get_field_descriptor('non_existence_field') + self.fail('Non existence field should throw an exceprion') + + def test_non_existence_record(self): + schema = self._get_schema_descriptor() + with self.assertRaises(ValueError): + schema.get_record_schema_descriptor('non_existence_record') + self.fail('Non existence field should throw an exceprion') + + def test_field_descriptor_at_first_level(self): + print self._get_table_schema() + schema = self._get_schema_descriptor() + + self.assertEqual( + schema.get_field_descriptor('field_1'), + bigquery_schema_descriptor.FieldDescriptor( + type=self._string, mode=self._nullable)) + self.assertEqual( + schema.get_field_descriptor('field_2'), + bigquery_schema_descriptor.FieldDescriptor( + type=self._integer, mode=self._repeated)) + + def test_field_descriptor_at_second_level(self): + main_schema = self._get_schema_descriptor() + record_schema = main_schema.get_record_schema_descriptor('record_1') + + self.assertEqual( + record_schema.get_field_descriptor('record_1-field_1'), + bigquery_schema_descriptor.FieldDescriptor( + type=self._boolean, mode=self._nullable)) + self.assertEqual( + record_schema.get_field_descriptor('record_1-field_2'), + bigquery_schema_descriptor.FieldDescriptor( + type=self._float, mode=self._repeated)) + + def test_field_descriptor_at_third_level(self): + main_schema = self._get_schema_descriptor() + parent_record_schema = main_schema.get_record_schema_descriptor( + 'record_1') + child_record_schema = parent_record_schema.get_record_schema_descriptor( + 'record_1-record_2') + + self.assertEqual( + child_record_schema.get_field_descriptor( + 'record_1-record_2-field_1'), + bigquery_schema_descriptor.FieldDescriptor( + type=self._boolean, mode=self._nullable)) diff --git a/gcp_variant_transforms/testing/integration/run_tests.py b/gcp_variant_transforms/testing/integration/run_tests.py index f8b7f5404..733afc547 100644 --- a/gcp_variant_transforms/testing/integration/run_tests.py +++ b/gcp_variant_transforms/testing/integration/run_tests.py @@ -33,6 +33,7 @@ """ import argparse +import enum import json import multiprocessing import os @@ -40,6 +41,7 @@ import time from datetime import datetime +from typing import List # pylint: disable=unused-import # TODO(bashir2): Figure out why pylint can't find this. # pylint: disable=no-name-in-module,import-error from google.cloud import bigquery @@ -73,8 +75,7 @@ def __init__(self, test_name, table_name, input_pattern, - validation_query, - expected_query_result, + assertion_configs, **kwargs): self._name = test_name @@ -82,8 +83,7 @@ def __init__(self, self._project = context.project self._table_name = '{}.{}'.format(dataset_id, table_name) output_table = '{}:{}'.format(context.project, self._table_name) - self._validation_query = (" ").join(validation_query) - self._expected_query_result = expected_query_result + self._assertion_configs = assertion_configs args = ['--input_pattern {}'.format(input_pattern), '--output_table {}'.format(output_table), '--project {}'.format(context.project), @@ -148,12 +148,26 @@ def _handle_failure(self, response): 'No traceback. See logs for more information on error.') def validate_table(self): - """Runs a simple query against the output table and verifies aggregates.""" + """Runs queries against the output table and verifies results.""" client = bigquery.Client(project=self._project) - # TODO(bashir2): Create macros for common queries and add the option for - # having a list of queries instead of just one. - query = self._validation_query.format(TABLE_NAME=self._table_name) - query_job = client.query(query) + query_formatter = QueryFormatter(self._table_name) + for assertion_config in self._assertion_configs: + query = query_formatter.format_query(assertion_config['query']) + assertion = QueryAssertion(client, query, assertion_config[ + 'expected_result']) + assertion.run_assertion() + + +class QueryAssertion(object): + """Runs a query and verifies that the output matches the expected result.""" + + def __init__(self, client, query, expected_result): + self._client = client + self._query = query + self._expected_result = expected_result + + def run_assertion(self): + query_job = self._client.query(self._query) assert query_job.state == 'RUNNING' iterator = query_job.result(timeout=60) rows = list(iterator) @@ -161,15 +175,52 @@ def validate_table(self): raise TestCaseFailure('Expected one row in query result, got {}'.format( len(rows))) row = rows[0] - if len(self._expected_query_result) != len(row): + if len(self._expected_result) != len(row): raise TestCaseFailure( 'Expected {} columns in the query result, got {}'.format( - len(self._expected_query_result), len(row))) - for key in self._expected_query_result.keys(): - if self._expected_query_result[key] != row.get(key): + len(self._expected_result), len(row))) + for key in self._expected_result.keys(): + if self._expected_result[key] != row.get(key): raise TestCaseFailure( 'Column {} mismatch: expected {}, got {}'.format( - key, self._expected_query_result[key], row.get(key))) + key, self._expected_result[key], row.get(key))) + + +class QueryFormatter(object): + """Formats a query. + + Replaces macros and variables in the query. + """ + + class _QueryMacros(enum.Enum): + NUM_ROWS_QUERY = 'SELECT COUNT(0) AS num_rows FROM {TABLE_NAME}' + SUM_START_QUERY = ( + 'SELECT SUM(start_position) AS sum_start FROM {TABLE_NAME}') + SUM_END_QUERY = 'SELECT SUM(end_position) AS sum_end FROM {TABLE_NAME}' + + def __init__(self, table_name): + # type: (str) -> None + self._table_name = table_name + + def format_query(self, query): + # type: (List[str]) -> str + """Formats the given ``query``. + + Formatting logic is as follows: + - Concatenates ``query`` parts into one string. + - Replaces macro with the corresponding value defined in _QueryMacros. + - Replaces variables associated for the query. + """ + return self._replace_variables(self._replace_macros(' '.join(query))) + + def _replace_variables(self, query): + return query.format(TABLE_NAME=self._table_name) + + def _replace_macros(self, query): + for macro in self._QueryMacros: + if macro.name == query: + return macro.value + return query class TestContextManager(object): @@ -272,11 +323,22 @@ def _load_test_config(filename): def _validate_test(test, filename): required_keys = ['test_name', 'table_name', 'input_pattern', - 'validation_query', 'expected_query_result'] + 'assertion_configs'] for key in required_keys: if key not in test: raise ValueError('Test case in {} is missing required key: {}'.format( filename, key)) + assertion_configs = test['assertion_configs'] + for assertion_config in assertion_configs: + _validate_assertion_config(assertion_config) + + +def _validate_assertion_config(assertion_config): + required_keys = ['query', 'expected_result'] + for key in required_keys: + if key not in assertion_config: + raise ValueError('Test case in {} is missing required key: {}'.format( + assertion_config, key)) def _run_test(test, context): diff --git a/gcp_variant_transforms/testing/integration/small_tests/valid_4_0.json b/gcp_variant_transforms/testing/integration/small_tests/valid_4_0.json index ce3f94c58..324b3efd9 100644 --- a/gcp_variant_transforms/testing/integration/small_tests/valid_4_0.json +++ b/gcp_variant_transforms/testing/integration/small_tests/valid_4_0.json @@ -3,15 +3,18 @@ "table_name": "valid_4_0", "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0.vcf", "runner": "DataflowRunner", - "validation_query": [ - "SELECT COUNT(0) AS num_rows, ", - " SUM(start_position) AS sum_start, ", - " SUM(end_position) AS sum_end ", - "FROM {TABLE_NAME}" - ], - "expected_query_result": { - "num_rows": 5, - "sum_start": 3607195, - "sum_end": 3607203 - } + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 5} + }, + { + "query": ["SUM_START_QUERY"], + "expected_result": {"sum_start": 3607195} + }, + { + "query": ["SUM_END_QUERY"], + "expected_result": {"sum_end": 3607203} + } + ] } diff --git a/gcp_variant_transforms/testing/integration/small_tests/valid_4_0_bz2.json b/gcp_variant_transforms/testing/integration/small_tests/valid_4_0_bz2.json index eaae90194..6670898a4 100644 --- a/gcp_variant_transforms/testing/integration/small_tests/valid_4_0_bz2.json +++ b/gcp_variant_transforms/testing/integration/small_tests/valid_4_0_bz2.json @@ -3,15 +3,18 @@ "table_name": "valid_4_0_bz2", "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0.vcf.bz2", "runner": "DataflowRunner", - "validation_query": [ - "SELECT COUNT(0) AS num_rows, ", - " SUM(start_position) AS sum_start, ", - " SUM(end_position) AS sum_end ", - "FROM {TABLE_NAME}" - ], - "expected_query_result": { - "num_rows": 5, - "sum_start": 3607195, - "sum_end": 3607203 - } + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 5} + }, + { + "query": ["SUM_START_QUERY"], + "expected_result": {"sum_start": 3607195} + }, + { + "query": ["SUM_END_QUERY"], + "expected_result": {"sum_end": 3607203} + } + ] } diff --git a/gcp_variant_transforms/testing/integration/small_tests/valid_4_0_gz.json b/gcp_variant_transforms/testing/integration/small_tests/valid_4_0_gz.json index 3d09dccfd..dfd5703ba 100644 --- a/gcp_variant_transforms/testing/integration/small_tests/valid_4_0_gz.json +++ b/gcp_variant_transforms/testing/integration/small_tests/valid_4_0_gz.json @@ -3,15 +3,18 @@ "table_name": "valid_4_0_gz", "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0.vcf.gz", "runner": "DataflowRunner", - "validation_query": [ - "SELECT COUNT(0) AS num_rows, ", - " SUM(start_position) AS sum_start, ", - " SUM(end_position) AS sum_end ", - "FROM {TABLE_NAME}" - ], - "expected_query_result": { - "num_rows": 5, - "sum_start": 3607195, - "sum_end": 3607203 - } + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 5} + }, + { + "query": ["SUM_START_QUERY"], + "expected_result": {"sum_start": 3607195} + }, + { + "query": ["SUM_END_QUERY"], + "expected_result": {"sum_end": 3607203} + } + ] } diff --git a/gcp_variant_transforms/testing/integration/small_tests/valid_4_1.json b/gcp_variant_transforms/testing/integration/small_tests/valid_4_1.json index d93d28eb4..7cdd9e86f 100644 --- a/gcp_variant_transforms/testing/integration/small_tests/valid_4_1.json +++ b/gcp_variant_transforms/testing/integration/small_tests/valid_4_1.json @@ -3,15 +3,18 @@ "table_name": "valid_4_1", "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.1-large.vcf", "runner": "DataflowRunner", - "validation_query": [ - "SELECT COUNT(0) AS num_rows, ", - " SUM(start_position) AS sum_start, ", - " SUM(end_position) AS sum_end ", - "FROM {TABLE_NAME}" - ], - "expected_query_result": { - "num_rows": 9882, - "sum_start": 5434957328, - "sum_end": 5435327553 - } + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 9882} + }, + { + "query": ["SUM_START_QUERY"], + "expected_result": {"sum_start": 5434957328} + }, + { + "query": ["SUM_END_QUERY"], + "expected_result": {"sum_end": 5435327553} + } + ] } diff --git a/gcp_variant_transforms/testing/integration/small_tests/valid_4_1_gz.json b/gcp_variant_transforms/testing/integration/small_tests/valid_4_1_gz.json index 9b6991a2d..e54eec05c 100644 --- a/gcp_variant_transforms/testing/integration/small_tests/valid_4_1_gz.json +++ b/gcp_variant_transforms/testing/integration/small_tests/valid_4_1_gz.json @@ -3,15 +3,18 @@ "table_name": "valid_4_1_gz", "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.1-large.vcf.gz", "runner": "DataflowRunner", - "validation_query": [ - "SELECT COUNT(0) AS num_rows, ", - " SUM(start_position) AS sum_start, ", - " SUM(end_position) AS sum_end ", - "FROM {TABLE_NAME}" - ], - "expected_query_result": { - "num_rows": 9882, - "sum_start": 5434957328, - "sum_end": 5435327553 - } + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 9882} + }, + { + "query": ["SUM_START_QUERY"], + "expected_result": {"sum_start": 5434957328} + }, + { + "query": ["SUM_END_QUERY"], + "expected_result": {"sum_end": 5435327553} + } + ] } diff --git a/gcp_variant_transforms/testing/integration/small_tests/valid_4_2.json b/gcp_variant_transforms/testing/integration/small_tests/valid_4_2.json index 3557026da..f8e39b4de 100644 --- a/gcp_variant_transforms/testing/integration/small_tests/valid_4_2.json +++ b/gcp_variant_transforms/testing/integration/small_tests/valid_4_2.json @@ -3,15 +3,18 @@ "table_name": "valid_4_2", "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.vcf", "runner": "DataflowRunner", - "validation_query": [ - "SELECT COUNT(0) AS num_rows, ", - " SUM(start_position) AS sum_start, ", - " SUM(end_position) AS sum_end ", - "FROM {TABLE_NAME}" - ], - "expected_query_result": { - "num_rows": 13, - "sum_start": 23031929, - "sum_end": 23033052 - } + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 13} + }, + { + "query": ["SUM_START_QUERY"], + "expected_result": {"sum_start": 23031929} + }, + { + "query": ["SUM_END_QUERY"], + "expected_result": {"sum_end": 23033052} + } + ] } diff --git a/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_VEP.json b/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_VEP.json index a8fd827eb..6c8867d13 100644 --- a/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_VEP.json +++ b/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_VEP.json @@ -4,12 +4,26 @@ "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2_VEP.vcf", "annotation_field": "CSQ", "runner": "DataflowRunner", - "validation_query": [ - "SELECT COUNT(DISTINCT CSQ.Feature) AS num_features ", - "FROM {TABLE_NAME} AS t, t.alternate_bases as alts, alts.CSQ as CSQ ", - "WHERE start_position = 1110695 AND alts.alt = 'G'" - ], - "expected_query_result": { - "num_features": 3 - } + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 11} + }, + { + "query": ["SUM_START_QUERY"], + "expected_result": {"sum_start": 21801693} + }, + { + "query": ["SUM_END_QUERY"], + "expected_result": {"sum_end": 21802814} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ.Feature) AS num_features ", + "FROM {TABLE_NAME} AS t, t.alternate_bases as alts, alts.CSQ as CSQ ", + "WHERE start_position = 1110695 AND alts.alt = 'G'" + ], + "expected_result": {"num_features": 3} + } + ] } diff --git a/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_gz.json b/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_gz.json index dba30fcdb..db41156d0 100644 --- a/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_gz.json +++ b/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_gz.json @@ -3,15 +3,18 @@ "table_name": "valid_4_2_gz", "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.vcf.gz", "runner": "DataflowRunner", - "validation_query": [ - "SELECT COUNT(0) AS num_rows, ", - " SUM(start_position) AS sum_start, ", - " SUM(end_position) AS sum_end ", - "FROM {TABLE_NAME}" - ], - "expected_query_result": { - "num_rows": 13, - "sum_start": 23031929, - "sum_end": 23033052 - } + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 13} + }, + { + "query": ["SUM_START_QUERY"], + "expected_result": {"sum_start": 23031929} + }, + { + "query": ["SUM_END_QUERY"], + "expected_result": {"sum_end": 23033052} + } + ] } diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 83afe8773..c8ce33abc 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -37,6 +37,7 @@ import datetime import enum import logging +import sys import tempfile import apache_beam as beam @@ -209,6 +210,7 @@ def _validate_args(options, parsed_args): def run(argv=None): """Runs VCF to BigQuery pipeline.""" + logging.info('Command: %s', ' '.join(argv or sys.argv)) parser = argparse.ArgumentParser() parser.register('type', 'bool', lambda v: v.lower() == 'true') command_line_options = [option() for option in _COMMAND_LINE_OPTIONS] From eed9e71ed308a39103295367a970ad35708e60b0 Mon Sep 17 00:00:00 2001 From: Nima Mousavi <4221028+nmousavi@users.noreply.github.com> Date: Wed, 14 Mar 2018 17:51:46 -0400 Subject: [PATCH 5/5] SchemaDescriptor class Tested: unit test --- .../libs/bigquery_schema_descriptor.py | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/gcp_variant_transforms/libs/bigquery_schema_descriptor.py b/gcp_variant_transforms/libs/bigquery_schema_descriptor.py index 17983ff19..f7002d106 100644 --- a/gcp_variant_transforms/libs/bigquery_schema_descriptor.py +++ b/gcp_variant_transforms/libs/bigquery_schema_descriptor.py @@ -18,23 +18,26 @@ from typing import NamedTuple from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=unused-import -__all__ = ['SchemaDescriptor'] - # Stores data about a simple field (not a record) in BigQuery Schema. FieldDescriptor = NamedTuple('FieldDescriptor', [('type', str), ('mode', str)]) class SchemaDescriptor(object): - """A dict based description for :class:`bigquery.TableSchema` object.""" + """A dict based description for :class:`bigquery.TableSchema` object. + + This class provides APIs for checking if and how (e.g. type, mode) a field + is defined in the BigQuery schema. This is useful when checking if + data matches its field definition in the schema for example. + """ def __init__(self, table_schema): # type: (bigquery.TableSchema) -> None # Dict of (field_name, :class:`FieldDescriptor`). - self.field_descriptor_dict = {} + self._field_descriptor_dict = {} # Dict of (record_name, :class:`SchemaDescriptor`). - self.schema_descriptor_dict = {} + self._schema_descriptor_dict = {} self._extract_all_descriptors(table_schema) @@ -44,10 +47,10 @@ def _extract_all_descriptors(self, table_schema): for field in table_schema.fields: if field.fields: # Record field. - self.schema_descriptor_dict[field.name] = SchemaDescriptor(field) + self._schema_descriptor_dict[field.name] = SchemaDescriptor(field) else: # Simple field. - self.field_descriptor_dict[field.name] = FieldDescriptor( + self._field_descriptor_dict[field.name] = FieldDescriptor( type=field.type, mode=field.mode) def get_field_descriptor(self, field_name): @@ -57,16 +60,16 @@ def get_field_descriptor(self, field_name): Args: field_name: name of a simple (not a record) field in BigQuery table. """ - if field_name in self.field_descriptor_dict: - return self.field_descriptor_dict[field_name] + if field_name in self._field_descriptor_dict: + return self._field_descriptor_dict[field_name] else: raise ValueError('Field descriptor not found. Not such field in Bigquery ' 'schema: {}'.format(field_name)) def get_record_schema_descriptor(self, record_name): # type: (str) -> SchemaDescriptor - if record_name in self.schema_descriptor_dict: - return self.schema_descriptor_dict[record_name] + if record_name in self._schema_descriptor_dict: + return self._schema_descriptor_dict[record_name] else: raise ValueError('Schema descriptor not found. No such record ' 'in Bigquery schema: {}'.format(record_name))