Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13391] Fix temporary file format in WriteToBigQuery #16156

Merged
merged 7 commits into from Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 5 additions & 6 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Expand Up @@ -335,7 +335,8 @@ class UpdateDestinationSchema(beam.DoFn):

This transform emits (destination, job_reference) pairs where the
job_reference refers to a submitted load job for performing the schema
modification. Note that the input and output job references are not the same.
modification in JSON format. Note that the input and output job references
are not the same.

Experimental; no backwards compatibility guarantees.
"""
Expand All @@ -345,13 +346,11 @@ def __init__(
test_client=None,
additional_bq_parameters=None,
step_name=None,
source_format=None,
load_job_project_id=None):
self._test_client = test_client
self._write_disposition = write_disposition
self._additional_bq_parameters = additional_bq_parameters or {}
self._step_name = step_name
self._source_format = source_format
self._load_job_project_id = load_job_project_id

def setup(self):
Expand All @@ -362,7 +361,6 @@ def display_data(self):
return {
'write_disposition': str(self._write_disposition),
'additional_bq_params': str(self._additional_bq_parameters),
'source_format': str(self._source_format),
}

def process(self, element, schema_mod_job_name_prefix):
Expand Down Expand Up @@ -441,7 +439,9 @@ def process(self, element, schema_mod_job_name_prefix):
create_disposition='CREATE_NEVER',
additional_load_parameters=additional_parameters,
job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
source_format=self._source_format,
# JSON format is hardcoded because zero rows load(unlike AVRO) and
# a nested schema(unlike CSV, which a default one) is permitted.
source_format="NEWLINE_DELIMITED_JSON",
load_job_project_id=self._load_job_project_id)
yield (destination, schema_update_job_reference)

Expand Down Expand Up @@ -1043,7 +1043,6 @@ def _load_data(
test_client=self.test_client,
additional_bq_parameters=self.additional_bq_parameters,
step_name=step_name,
source_format=self._temp_file_format,
load_job_project_id=self.load_job_project_id),
schema_mod_job_name_pcv))

Expand Down
95 changes: 69 additions & 26 deletions sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
Expand Up @@ -32,6 +32,8 @@
import mock
import pytest
import pytz
from parameterized import param
from parameterized import parameterized

import apache_beam as beam
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
Expand Down Expand Up @@ -84,6 +86,11 @@ def tearDown(self):
def create_table(self, table_name):
table_schema = bigquery.TableSchema()
table_field = bigquery.TableFieldSchema()
table_field.name = 'int64'
table_field.type = 'INT64'
table_field.mode = 'REQUIRED'
table_schema.fields.append(table_field)
table_field = bigquery.TableFieldSchema()
table_field.name = 'bytes'
table_field.type = 'BYTES'
table_schema.fields.append(table_field)
Expand Down Expand Up @@ -297,16 +304,25 @@ def test_big_query_write_without_schema(self):
table_id = '{}.{}'.format(self.dataset_id, table_name)

input_data = [{
'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'
}, {
'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'
'int64': 1,
'bytes': b'xyw',
'date': '2011-01-01',
'time': '23:59:59.999999'
},
{
'int64': 2,
'bytes': b'abc',
'date': '2000-01-01',
'time': '00:00:00'
},
{
'int64': 3,
'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd',
'date': '3000-12-31',
'time': '23:59:59'
},
{
'int64': 4,
'bytes': b'\xab\xac\xad',
'date': '2000-01-01',
'time': '00:00:00'
Expand All @@ -318,22 +334,27 @@ def test_big_query_write_without_schema(self):
pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
query="SELECT bytes, date, time FROM %s" % table_id,
query="SELECT int64, bytes, date, time FROM %s" % table_id,
data=[(
1,
b'xyw',
datetime.date(2011, 1, 1),
datetime.time(23, 59, 59, 999999),
), (
b'abc',
datetime.date(2000, 1, 1),
datetime.time(0, 0, 0),
),
(
2,
b'abc',
datetime.date(2000, 1, 1),
datetime.time(0, 0, 0),
),
(
3,
b'\xe4\xbd\xa0\xe5\xa5\xbd',
datetime.date(3000, 12, 31),
datetime.time(23, 59, 59),
),
(
4,
b'\xab\xac\xad',
datetime.date(2000, 1, 1),
datetime.time(0, 0, 0),
Expand All @@ -353,12 +374,17 @@ def test_big_query_write_without_schema(self):
temp_file_format=FileFormat.JSON))

@pytest.mark.it_postcommit
@parameterized.expand([
param(file_format=FileFormat.AVRO),
param(file_format=FileFormat.JSON),
param(file_format=None),
])
@mock.patch(
"apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1)
def test_big_query_write_temp_table_append_schema_update(self):
def test_big_query_write_temp_table_append_schema_update(self, file_format):
"""
Test that schema update options are respected when appending to an existing
table via temporary tables.
Test that nested schema update options and schema relaxation
are respected when appending to an existing table via temporary tables.

_MAXIMUM_SOURCE_URIS and max_file_size are both set to 1 to force multiple
load jobs and usage of temporary tables.
Expand All @@ -367,17 +393,16 @@ def test_big_query_write_temp_table_append_schema_update(self):
self.create_table(table_name)
table_id = '{}.{}'.format(self.dataset_id, table_name)

input_data = [{
"int64": num, "bool": True, "nested_field": {
"fruit": "Apple"
}
} for num in range(1, 3)]

# bytes, date, time fields are optional and omitted in the test
# only required and new columns are specified
table_schema = {
"fields": [{
"name": "int64", "type": "INT64"
"name": "int64",
"type": "INT64",
"mode": "NULLABLE",
}, {
"name": "bool", "type": "BOOL"
"name": "bool",
"type": "BOOL",
},
{
"name": "nested_field",
Expand All @@ -392,18 +417,34 @@ def test_big_query_write_temp_table_append_schema_update(self):
]
}]
}

input_data = [{
"int64": 1, "bool": True, "nested_field": [{
"fruit": "Apple"
}]
}, {
"bool": False, "nested_field": [{
"fruit": "Mango"
}]
},
{
"int64": None,
"bool": True,
"nested_field": [{
"fruit": "Banana"
}]
}]
args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=BigqueryFullResultMatcher(
project=self.project,
query="""
SELECT bytes, date, time, int64, bool, fruit
FROM %s,
FROM {},
UNNEST(nested_field) as nested_field
ORDER BY int64
""" % table_id,
data=[(None, None, None, num, True, "Apple")
for num in range(1, 3)]))
ORDER BY fruit
""".format(table_id),
data=[(None, None, None, 1, True,
"Apple"), (None, None, None, None, True, "Banana"), (
None, None, None, None, False, "Mango")]))

with beam.Pipeline(argv=args) as p:
# pylint: disable=expression-not-assigned
Expand All @@ -416,7 +457,9 @@ def test_big_query_write_temp_table_append_schema_update(self):
max_file_size=1, # bytes
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
additional_bq_parameters={
'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION']}))
'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION',
'ALLOW_FIELD_RELAXATION']},
temp_file_format=file_format))


if __name__ == '__main__':
Expand Down