From f3da5eb0f70f51c3e0b4b304b55d56cba7cd3f99 Mon Sep 17 00:00:00 2001 From: "chamikara@google.com" Date: Wed, 22 Mar 2017 13:17:26 -0700 Subject: [PATCH] Updates BigQuery read transform to correctly process empty repeated fields. --- sdks/python/apache_beam/io/gcp/bigquery.py | 13 +++++++++++-- sdks/python/apache_beam/io/gcp/bigquery_test.py | 3 ++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 3186a5575dc9..a917d5115e07 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1069,8 +1069,17 @@ def convert_row_to_dict(self, row, schema): cell = row['f'][index] value = cell['v'] if 'v' in cell else None if field.mode == 'REPEATED': - result[field.name] = [self._convert_cell_value_to_dict(x['v'], field) - for x in value] + if value is None: + # We receive 'None' for repeated fields without any values when + # 'flatten_results' is 'False'. + # When 'flatten_results' is 'True', we receive individual values + # instead of a list of values hence we do not hit this condition. + # We return an empty list here instead of 'None' to be consistent with + # other runners and to be backwards compatible to users. + result[field.name] = [] + else: + result[field.name] = [self._convert_cell_value_to_dict(x['v'], field) + for x in value] elif value is None: if not field.mode == 'NULLABLE': raise ValueError('Received \'None\' as the value for the field %s ' diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index fbf073cf3f95..2b83079ba865 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -402,7 +402,8 @@ def get_test_rows(self): bigquery.TableCell(v=None), bigquery.TableCell(v=None), bigquery.TableCell(v=None), - bigquery.TableCell(v=to_json_value([]))])] + # REPEATED field without any values. + bigquery.TableCell(v=None)])] return table_rows, schema, expected_rows def test_read_from_table(self):