-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-6769][BEAM-7327] add it test for writing and reading with bigqu… #8621
Conversation
Run Python PostCommit |
Run Python PostCommit |
Run Python PostCommit |
5e22885
to
ffa3399
Compare
Run Python PostCommit |
Run Python PostCommit |
a5edbee
to
95aa720
Compare
Run Python PostCommit |
Run Python PostCommit |
df83d85
to
99f3cbc
Compare
Run Python PostCommit |
Passing postcommits: Now you can just fix the lint issue, and should be good to go : ) |
@@ -654,12 +654,17 @@ def apply_WriteToBigQuery(self, transform, pcoll, options): | |||
return self.apply_PTransform(transform, pcoll, options) | |||
else: | |||
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json | |||
if transform.schema == beam.io.gcp.bigquery.SCHEMA_AUTODETECT \ | |||
or transform.schema is None: | |||
schema = transform.schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe here we want schema to be none always? that way we don't have to special case further down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, I've added a comment about this here: https://issues.apache.org/jira/browse/BEAM-7382
If we have autodetection while using the BigQuerySink, we should error out, as it is not supported.
a8e0895
to
f4725df
Compare
Run Python PostCommit |
Run Python PostCommit |
Run Python PostCommit |
@tvalentyn, @pabloem: R (the postcommits are failing but I don't think it is related to my pr) |
@@ -925,6 +925,12 @@ def __iter__(self): | |||
if self.schema is None: | |||
self.schema = schema | |||
for row in rows: | |||
# return base64 encoded bytes as byte type on python 3 | |||
# to match behavior DataflowRunner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say "which matches the behavior of Beam Java SDK". Note that this does not depend on Py3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Juta
@@ -998,6 +1004,13 @@ def encode(self, table_row): | |||
# This code will catch this error to emit an error that explains | |||
# to the programmer that they have used NAN/INF values. | |||
try: | |||
# on python 3 base64 bytes are decoded to strings before being send to bq | |||
if sys.version[0] == '3': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's replace == '3'
with > 2
.
@@ -998,6 +1004,13 @@ def encode(self, table_row): | |||
# This code will catch this error to emit an error that explains | |||
# to the programmer that they have used NAN/INF values. | |||
try: | |||
# on python 3 base64 bytes are decoded to strings before being send to bq | |||
if sys.version[0] == '3': | |||
if type(table_row) == str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When do we go through this branch? Why do we only go through json.loads() here on Python 3, but not Python 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This codepath will decode bytes when they are written to bigquery (it expects bytes to be base-64 encoded but allows them to have type bytes
or str
) This is to allow the same data that is read by bq to also be writen to bq.
in https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py#L54 data is written to bigquery as a string so that is why we first do a json.loads ()
before handling the bytes decoding (and this is only necessary in python 3).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand why/when we may receive a string here and not a dictionary (as per docstring) during encode()
call. The test you referenced seems to call json.loads() before ingesting data into BQ:
_ELEMENTS = list([json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py#L244. In this test no json.loads()
is done. Should I edit the tests to pass dicts instead of strings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the test needs to be fixed to pass dicts or BQ IO file loads method needs to use a different codec, not a codec that expects dicts.
I suggest we change the test to pass dicts. @pabloem, @chamikaramj - do you have a different opinion on this?
@@ -998,6 +1004,13 @@ def encode(self, table_row): | |||
# This code will catch this error to emit an error that explains | |||
# to the programmer that they have used NAN/INF values. | |||
try: | |||
# on python 3 base64 bytes are decoded to strings before being send to bq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's change base64 bytes
to base64-encoded
bytes.
Run Python PreCommit |
Run Python PostCommit |
@tvalentyn PTAL |
Run Python PostCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot Juta for making this change and also improving BQ test coverage. The change looks good to me except for one place I still don't understand.
@@ -32,6 +32,8 @@ task postCommitIT(dependsOn: 'installGcpTest') { | |||
"apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest", | |||
"apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT", | |||
"apache_beam.io.gcp.bigquery_io_read_it_test", | |||
"apache_beam.io.gcp.bigquery_read_it_test", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably rename apache_beam.io.gcp.bigquery_io_read_it_test in the future or combine it with the scenarios added here to avoid naming collision. We can do it in a later change.
str(int(time.time())), | ||
random.randint(0, 10000)) | ||
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) | ||
self.output_table = "%s.{}" % (self.dataset_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this field confusing since at this point output table is not created yet. We could return a fully-qualified table name in create_table()
method instead of storing the pattern.
@@ -998,6 +1004,13 @@ def encode(self, table_row): | |||
# This code will catch this error to emit an error that explains | |||
# to the programmer that they have used NAN/INF values. | |||
try: | |||
# on python 3 base64 bytes are decoded to strings before being send to bq | |||
if sys.version[0] == '3': | |||
if type(table_row) == str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand why/when we may receive a string here and not a dictionary (as per docstring) during encode()
call. The test you referenced seems to call json.loads() before ingesting data into BQ:
_ELEMENTS = list([json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS]) |
Run Python PostCommit |
Run Python PostCommit |
Run Python PostCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Juta, this looks good to me. Very happy to see an increase in test coverage.
@pabloem, can you please take a look and help with the merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just left one question that can be answered after merge.
def decode(self, encoded_table_row): | ||
return self.coder.decode(encoded_table_row) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can merge this now. I am just curious: Why did you add this custom coder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take a look at #8621 (comment) thread and let us know if you think we should do something else here.
…ery io
This pr adds it suites for python 3.6. This is is part of a series of PRs with goal to make Apache Beam PY3 compatible. The proposal with the outlined approach has been documented here: https://s.apache.org/beam-python-3.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.