Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update document Storage API Support in Google BigQuery I/O connector for Python SDK #26889

Merged
merged 16 commits into from
Jul 11, 2023

Conversation

RyuSA
Copy link
Contributor

@RyuSA RyuSA commented May 25, 2023

Please add a meaningful description for your change here

This pull request contains these changes below.👏

  • Add usage of WriteToBigQuery.Method.STORAGE_WRITE_API to the document.
  • Add usage of ReadFromBigQuery.Method.DIRECT_READ to the document.

Here is an example code to check the features.
https://gist.github.com/RyuSA/84968c2322771ce411e63154593b2319

context

In the original issue(#26693), we discussed about only STORAGE_WRITE_API, but I found the BigQuery read api is also supported now. I just add the usage of the read api too. If any concerns you see please let me know.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

RyuSA added 2 commits May 25, 2023 08:17
Add usage of BigQuery StorageWriteAPI in Python.
Add usage of BigQuery StorageReadAPI in Python.
@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @AnandInguva for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Abacn
Copy link
Contributor

Abacn commented May 25, 2023

R: @ahmedabu98

also, please check the "PythonFormatter" and "PythonLint" results. Could format the code by running

# Run from root beam repo dir
pip install yapf==0.29.0
git diff HEAD --name-only | grep "\.py$" | xargs yapf --in-place

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

apply lint and format
Comment on lines 1192 to 1197
# [START model_bigqueryio_write_with_storage_write_api]
quotes | beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)
# [END model_bigqueryio_write_with_storage_write_api]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing with storage api currently has some limitations on supported data types. This is due to the what types are currently supported at the cross-language boundary. Important to note that the python decimal.Decimal is needed to write a NUMERIC BigQuery type. Similarly, the the Beam Timestamp type is needed to write a TIMESTAMP BigQuery type.

Also, some other types are not yet supported, like "DATETIME". Might be worth mentioning some of that, here is the full allowed list:

BIGQUERY_TYPE_TO_PYTHON_TYPE = {
"STRING": str,
"BOOL": bool,
"BOOLEAN": bool,
"BYTES": bytes,
"INT64": np.int64,
"INTEGER": np.int64,
"FLOAT64": np.float64,
"FLOAT": np.float64,
"NUMERIC": decimal.Decimal,
"TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
}

Copy link
Contributor Author

@RyuSA RyuSA May 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for letting me know. I didn't know the limitations!
I have just added these notes.👍 0af3174

Copy link

@HaeSe0ng HaeSe0ng Jul 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#27347
Hi, @ahmedabu98 .
I tried to write 'TIMESTAMP' type, but I got an error message

java.lang.IllegalArgumentException: Input schema is not assignable to output schema.
Input schema=Fields:
.......
INFO 2023-07-03T09:27:24.015855Z Field{name=ts, description=, type=LOGICAL_TYPE<beam:logical_type:micros_instant:v1>, options={{}}}
.......
, Output schema=Fields:
.......
INFO 2023-07-03T09:27:24.016054Z Field{name=ts, description=, type=DATETIME, options={{}}}
.......

I looked into beam source codes and found that input mapping and output mapping are not matched in the schema validation code). I think output mapping should be FieldType.logicalType(SqlTypes.TIMESTAMP).
Is there a way to avoid this error, or is TIMESTAMP type not supported yet?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into the code again and found this.
It seems that beam:logical_type:millis_instant:v1 is translated as FieldType.DATETIME in SchemaTranslation, which is matched with the output mapping.
To use MillisInstant, LogicalType.register_logical_type(MillisInstant) should be added to the pipeline code.

Example

LogicalType.register_logical_type(MillisInstant)
quotes | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
)

Copy link
Contributor

@ahmedabu98 ahmedabu98 Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this @HaeSe0ng!

@Abacn do you know if it's sufficient for us to include the line LogicalType.register_logical_type(MillisInstant) in the connector? e.g. here:

else:
# Storage Write API

Or will we have to tell users to add this line themselves?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into the code again and found this. It seems that beam:logical_type:millis_instant:v1 is translated as FieldType.DATETIME in SchemaTranslation, which is matched with the output mapping. To use MillisInstant, LogicalType.register_logical_type(MillisInstant) should be added to the pipeline code.

Example

LogicalType.register_logical_type(MillisInstant)
quotes | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
)

Where does LogicalType.register_logical_type(MillisInstant) reference from? I'm trying to find a workaround that's usable for now since this isn't fixed in v2.49.0. My JSON schema is correctly converted to a bigquery.TableSchema instance but when passing it to apache beam it gives the error you mentioned above.

In my case the timestamp type is incorrectly translated to datetime (LOGICAL_TYPE<beam:logical_type:micros_instant:v1>DATETIME NOT NULL).

I'd love to help solve this, please let me know of anything I can help with for testing etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoeCMoore are you getting this error even when setting LogicalType.register_logical_type(MillisInstant)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahmedabu98 No, this was a workaround for the bug. If you need any more details I'd be happy to help.

@ahmedabu98
Copy link
Contributor

Also for the STORAGE_WRITE_API use case, we should mention that an expansion service would have to be built before running the pipeline.

If running from a Beam git clone, they can build it with ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build, then the pipeline will choose that jar by default at run-time.

If they want to specify their own expansion-service, they can build it and specify with the expansion_service parameter in WriteToBigQuery

num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION,
expansion_service=None):
"""Initialize a WriteToBigQuery transform.

@Abacn
Copy link
Contributor

Abacn commented May 26, 2023

@ahmedabu98 if run a released version of beam, expansion service can be automatically downloaded and started. The only thing needed is a Java environment (to run the expansion service)

@RyuSA
Copy link
Contributor Author

RyuSA commented May 28, 2023

As @ahmedabu98 mentioned, some of CI failed due to the service expansion error.

RuntimeError: /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Coverage_Commit/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/java/io/google-cloud-platform/expansion-service/build/libs/beam-sdks-java-io-google-cloud-platform-expansion-service-2.49.0-SNAPSHOT.jar not found. 
Please build the server with
cd /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Coverage_Commit/src/sdks/python/test-suites/tox/py38/build/srcs; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:build

Can I add a gradle task sdks:java:io:google-cloud-platform:expansion-service:build to sdks:python:test-suites:tox:py38:preCommitPyCoverage and pythonPreCommit ?

@ahmedabu98
Copy link
Contributor

Can I add a gradle task sdks:java:io:google-cloud-platform:expansion-service:build to sdks:python:test-suites:tox:py38:preCommitPyCoverage and pythonPreCommit ?

Is there another workaround? adding sdks:java:io:google-cloud-platform:expansion-service:build as a dependency will increase the build time by quite a bit (mainly due to building all of google-cloud-platform).

Copy link
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some suggestions to clarify things a little

RyuSA and others added 2 commits June 6, 2023 23:54
Python SDK has some limitations on types.

Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com>
Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com>
@RyuSA
Copy link
Contributor Author

RyuSA commented Jun 6, 2023

@ahmedabu98

Is there another workaround?

I tried to make small changes to the CI pipeline. But this is not a good approach because I should change a lot or result longer build times.

How about creating a new snippets.py.txt and adding the STORAGE_WRITE_API example codes to the file instead of snippets.py? The root cause is that the Python runtime during CI pipeline will evalute the STORAGE_WRITE_API implementation and it tries to load the jar file . I think Python runtime does not evalute the snippets.py.txt.

  • Pros: small changes. few side effects. build time is file.
  • Cons: Python runtime will verify the examples in snippets.py. We need to verify the examples in snippets.py.txt by ourselves.

move examples that requires expansion-service from snippets.py
@codecov
Copy link

codecov bot commented Jun 6, 2023

Codecov Report

Merging #26889 (0e02469) into master (9270ee3) will increase coverage by 0.15%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##           master   #26889      +/-   ##
==========================================
+ Coverage   71.97%   72.13%   +0.15%     
==========================================
  Files         747      836      +89     
  Lines      101306   102091     +785     
==========================================
+ Hits        72920    73647     +727     
- Misses      26927    26985      +58     
  Partials     1459     1459              
Flag Coverage Δ
python 81.09% <100.00%> (+0.13%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...s/python/apache_beam/examples/snippets/snippets.py 73.18% <100.00%> (-3.53%) ⬇️

... and 183 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@ahmedabu98
Copy link
Contributor

Hey, sorry for the delay, I was on vacation for a while.

Would it be feasible to create a new model_bigqueryio for Storage API write method? Then we can add a new test (also just for Storage API) that we can trigger with the rest of the GCP xlang tests. We can add a few decorators to the new test to make sure it runs on the GCP xlang postcommits and get skipped otherwise, e.g.:

@pytest.mark.uses_gcp_java_expansion_service
@unittest.skipUnless(
os.environ.get('EXPANSION_PORT'),
"EXPANSION_PORT environment var is not provided.")

This means it won't run with the Python Coverage Commit tests, but will run on PostCommit_Python_Xlang_Gcp_Direct and PostCommit_Python_Xlang_Gcp_Dataflow

Let me know what you think of this suggestion :)

@RyuSA
Copy link
Contributor Author

RyuSA commented Jun 21, 2023

@ahmedabu98 Hi, I'm also sorry for my delay. I got a sick and had to fix my P1 body issue. 😭

Let me know what you think of this suggestion :)

Yes, it makes sense. 👍
I just created a new function model_bigqueryio_xlang for cross-language BigQuery source, and a new empty test test_model_bigqueryio_xlang. Do you think I correctly understand your suggestion ?

Copy link
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should do it

sdks/python/apache_beam/examples/snippets/snippets.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/examples/snippets/snippets_test.py Outdated Show resolved Hide resolved
RyuSA and others added 3 commits June 24, 2023 09:19
Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com>
Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com>
@ahmedabu98
Copy link
Contributor

Run Python_Xlang_Gcp_Direct PostCommit

@ahmedabu98
Copy link
Contributor

Run Python_Xlang_Gcp_Dataflow PostCommit

@ahmedabu98
Copy link
Contributor

Ahh looks like we're running into a validation error:

RuntimeError: java.lang.RuntimeException: java.lang.IllegalArgumentException: Input schema is not assignable to output schema.

This check is done during pipeline construction time, it compares the input data schema with the table schema... I think we can bypass this by giving it a table that doesn't exist, it will skip the validation check.

pipeline, write_project='', write_dataset='', write_table=''):
"""Examples for cross-language BigQuery sources and sinks."""

table_spec = 'clouddataflow-readonly:samples.weather_stations'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
table_spec = 'clouddataflow-readonly:samples.weather_stations'
table_spec = 'clouddataflow-readonly:samples.<non-existent-table>'

so everything else can stay the same, just put a table name that doesn't exist. can even use str(uuid.uuid4()) in the name to make sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, I added your suggestion. It seems it works! 👍

to avoid CI error
@RyuSA
Copy link
Contributor Author

RyuSA commented Jun 25, 2023

Run Python_Xlang_Gcp_Direct PostCommit

@RyuSA
Copy link
Contributor Author

RyuSA commented Jun 25, 2023

Run Python_Xlang_Gcp_Dataflow PostCommit

Copy link
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like tests are running and passing now, thank you for these changes! LGTM

@Abacn for final review and merge

@Abacn
Copy link
Contributor

Abacn commented Jul 11, 2023

Thanks! LGTM merging for now

@Abacn Abacn merged commit 6eb1b7e into apache:master Jul 11, 2023
62 of 63 checks passed
aleksandr-dudko pushed a commit to aleksandr-dudko/beam that referenced this pull request Jul 17, 2023
…` for Python SDK (apache#26889)

* Update BigQuery document; Python SDK(apache#26693)

Add usage of BigQuery StorageWriteAPI in Python.

---------

Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com>
bullet03 pushed a commit to akvelon/beam that referenced this pull request Aug 11, 2023
…` for Python SDK (apache#26889)

* Update BigQuery document; Python SDK(apache#26693)

Add usage of BigQuery StorageWriteAPI in Python.

---------

Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com>
cushon pushed a commit to cushon/beam that referenced this pull request May 24, 2024
…` for Python SDK (apache#26889)

* Update BigQuery document; Python SDK(apache#26693)

Add usage of BigQuery StorageWriteAPI in Python.

---------

Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants