From f6a94610b674760075c3c0af66b7b03da154f2bc Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Fri, 10 Feb 2017 14:19:53 -0800 Subject: [PATCH 1/2] Updates BigQuery read transform so that DirectRunner handles 'null' fields properly. Before this change, for DirectRunner, a record (dictionary) returned by BigQuery read transform will not contain keys for fields that are 'null'. For DataflowRunner, these fields will be available with value 'None'. I believe, retaining these fields value 'None' to be the proper behavior here. This change makes these two runners consistent when it comes to handling BigQuery 'null' values. --- sdks/python/apache_beam/io/bigquery.py | 11 +++++------ sdks/python/apache_beam/io/bigquery_test.py | 11 ++++++++--- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 8f55f42d147c..4d81b5d6d47f 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -1065,17 +1065,16 @@ def convert_row_to_dict(self, row, schema): value = None if isinstance(schema, bigquery.TableSchema): cell = row.f[index] - if cell.v is None: - continue # Field not present in the row. - value = from_json_value(cell.v) + value = from_json_value(cell.v) if cell.v is not None else None elif isinstance(schema, bigquery.TableFieldSchema): cell = row['f'][index] - if 'v' not in cell: - continue # Field not present in the row. - value = cell['v'] + value = cell['v'] if 'v' in cell else None if field.mode == 'REPEATED': result[field.name] = [self._convert_cell_value_to_dict(x['v'], field) for x in value] + elif value is None: + assert field.mode == 'NULLABLE' + result[field.name] = None else: result[field.name] = self._convert_cell_value_to_dict(value, field) return result diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index cdca884fa72e..b64121ad38f2 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -308,14 +308,19 @@ def get_test_rows(self): 'ts': '22:39:12.627498', 'dt_ts': '2008-12-25T07:30:00', 'r': {'s2': 'b'}, - 'rpr': [{'s3': 'c', 'rpr2': [{'rs': ['d', 'e'], 's4': 'f'}]}] + 'rpr': [{'s3': 'c', 'rpr2': [{'rs': ['d', 'e'], 's4': None}]}] }, { 'i': 10, 's': 'xyz', 'f': -3.14, 'b': False, - 'rpr': [] + 'rpr': [], + 't': None, + 'dt': None, + 'ts': None, + 'dt_ts': None, + 'r': None, }] nested_schema = [ @@ -372,7 +377,7 @@ def get_test_rows(self): # schemas correctly so we have to use this f,v based format bigquery.TableCell(v=to_json_value({'f': [{'v': 'b'}]})), bigquery.TableCell(v=to_json_value([{'v':{'f':[{'v':'c'}, {'v':[ - {'v':{'f':[{'v':[{'v':'d'}, {'v':'e'}]}, {'v':'f'}]}}]}]}}])) + {'v':{'f':[{'v':[{'v':'d'}, {'v':'e'}]}, {'v':None}]}}]}]}}])) ]), bigquery.TableRow(f=[ bigquery.TableCell(v=to_json_value('false')), From c1c50bd471f74465952ec604079a5c33464e1efa Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Fri, 10 Feb 2017 16:56:49 -0800 Subject: [PATCH 2/2] Addressing reviewer comments. --- sdks/python/apache_beam/io/bigquery.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 4d81b5d6d47f..93033ead219f 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -1073,7 +1073,9 @@ def convert_row_to_dict(self, row, schema): result[field.name] = [self._convert_cell_value_to_dict(x['v'], field) for x in value] elif value is None: - assert field.mode == 'NULLABLE' + if not field.mode == 'NULLABLE': + raise ValueError('Received \'None\' as the value for the field %s ' + 'but the field is not NULLABLE.', field.name) result[field.name] = None else: result[field.name] = self._convert_cell_value_to_dict(value, field)