-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10135] Add Python wrapper for Jdbc Write external transform #12023
[BEAM-10135] Add Python wrapper for Jdbc Write external transform #12023
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @piotr-szuberski! I left a few comments/suggestions
transforms. | ||
|
||
If you start Flink's Job Server, the expansion service will be started on | ||
port 8097. This is also the configured default for this transform. For a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks to me like the default is to start an expansion service with sdks:java:io:expansion-service:shadowJar
, not to use port 8097, is this out of date?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 approaches.
- Start flink job server using
sdks:java:io:expansion-service:runShadow
which runs expansion server at localhost:8097 - Create shadow-jars for expansion service and flink job server via gradle
sdks:java:io:expansion-service:shadowJar
,runners:flink:1.10:job-server:shadowJar
and leave expansion_service as None, then those jars will be used with random ports.
I din't dig into it but in 2) at first it tries to run expansion service and when it fails it runs the flink job server which underhood runs the expansion service.
My main example for the code I wrote was kafka.py and generate_sequence external transforms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But where does the code for (1) live on the python side?
As far as I can tell the only two options for this transform are either overriding the expansion service with the expansion_service=
parameter, or the default - running shadowJar with BeamJarExpansionService
(what you have in (2)). I can't find any logic that looks for an expansion service on port 8097. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the python side you can add a parameter expansion_service='localhost:8097' instead of expansion_service=BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar').
In most cases a user of the transform will start a container with flink job server which exposes localhost:8097 and use this address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, but my concern is that this docstring indicates using the expansion service at localhost:8097 is the "configured default for this transform," which doesn't seem to be the case. It looks to me like the default is to use BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
, which will start up an expansion service in a subprocess.
I think that's a great default and should be used in the vast majority of cases, I don't think there's any reason to build JdbcIO into the flink expansion service and encourage users to reference that. My understanding is the flink expansion service was just a temporary solution before we had the ability to run expansion services with BeamJarExpansionService
, so there's no need to use it anymore (@chamikaramj, @robertwb to confirm or correct me).
I think you should:
- Update the docstring to indicate the default is to use
sdks:java:io:expansion-service:shadowJar
, or even not reference it at all since it should happen transparently for the user. - Drop mention of the Flink job server and it's expansion service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed most of those information as when the default is to use BeamJarExpansionService(sdks:java:io:expansion-service:shadowJar) they are redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We updated Kafka to build or download the IO expansion jar by default. Probably you can follow the same model here. I updated documentation for Kafka recently: https://github.com/apache/beam/pull/11928/files
BTW one downside with a single IO expansion jar is that the size of the jar increases linearly with the number of cross-language wrappers that we have. This will increase the pipeline setup time for users since a larger jar has to be staged. What is the size increase of IO expansion jar due to this wrapper ?
cc: @robertwb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chamikaramj The Jar is about 1Mb larger
I've added changes to docs similiar to Kafka in #12145
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Staging time is a valid concern; thanks for calling this out. I think 1MB is a small price to pay for the convenience here--we could have dozens of wrappers here of that size without meaningfully affecting staging time. If it were 10s of MB, or we were getting into a long tail of obscure IOs, that starts to be a different story.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, 1 MB is fine. I think we should continue to monitor the size of this dependency in future Python wrappers that add to it though to make sure that the size does not get increased significantly.
Please refer to the portability documentation on how to do that. Flink Users | ||
can use the built-in Expansion Service of the Flink Runner's Job Server. The | ||
expansion service address has to be provided when instantiating the | ||
transforms. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should indicate this is an ExternalTransform in the module docstring, it should be transparent to the user (I received similar guidance for the xlang SqlTransform: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sql.py).
This is useful information though, maybe it could be moved to the docstring for WriteToJdbc.__init__
to document the expansion_service
param?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I move it to Write transform, then it should also be in Read transform which is the next step of this task so the information would be duplicated.
It is already done in the same way in kafka.py and generate_sequence.py.
@TheNeuralBit Thanks for the review! I'll take care of it on monday, unfortunately i can't do it earlier. |
9c88e65
to
652f733
Compare
@TheNeuralBit I've made the requested changes. If you have further suggestions - go ahead. |
fd5c69d
to
fbfdf22
Compare
Run Python PreCommit |
3 similar comments
Run Python PreCommit |
Run Python PreCommit |
Run Python PreCommit |
@TheNeuralBit ping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this test to run continuously somewhere, and with the unittest.skipIf checks you have now I don't think it ever will. I think if you ensure the required packages are in the test extra this test will at least get pulled into the python precommit and run on the fn_api_runner, which should be able to run xlang transforms.
Also you can test it on Flink and Spark. If you add @attr('UsesCrossLanguageTransforms')
like in generate_sequence.py it will be run in XVR_Flink and XVR_Spark:
@attr('UsesCrossLanguageTransforms') |
You can verify its run in XVR_Flink locally by running ./gradlew :runners:flink:1.10:jobserver:validatesCrossLanguageRunnerPythonUsingJava
and checking the output.
|
||
@unittest.skipIf(sqlalchemy is None, "sql alchemy package is not installed.") | ||
@unittest.skipIf( | ||
PostgresContainer is None, "testcontainers package is not installed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add these packages to the test extra:
Lines 181 to 198 in 92170e8
REQUIRED_TEST_PACKAGES = [ | |
'freezegun>=0.3.12', | |
'nose>=1.3.7', | |
'nose_xunitmp>=0.4.1', | |
'pandas>=0.24.2,<1; python_full_version < "3.5.3"', | |
'pandas>=0.25.2,<1; python_full_version >= "3.5.3"', | |
'parameterized>=0.7.1,<0.8.0', | |
# pyhamcrest==1.10.0 doesn't work on Py2. Beam still supports Py2. | |
# See: https://github.com/hamcrest/PyHamcrest/issues/131. | |
'pyhamcrest>=1.9,!=1.10.0,<2.0.0', | |
'pyyaml>=3.12,<6.0.0', | |
'requests_mock>=1.7,<2.0', | |
'tenacity>=5.0.2,<6.0', | |
'pytest>=4.4.0,<5.0', | |
'pytest-xdist>=1.29.0,<2', | |
'pytest-timeout>=1.3.3,<2', | |
'rsa<4.1; python_version < "3.0"', | |
] |
cc: @aaltay in case there are any concerns with adding these dependencies. For licenses they're both under Apache 2.0 (and it's also just a test dependency).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I agree. It is fine to add as test only dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok if the tests will run for python >=3.5 only? testcontainers don't support python2, as well as pytest-postgresql.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's fine, you can just add a skipIf based on the python version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left those skipIfs as they were and added testcontainers only for python3 testcontainers<3.0.3; python_version >= "3.5"'
transforms. | ||
|
||
If you start Flink's Job Server, the expansion service will be started on | ||
port 8097. This is also the configured default for this transform. For a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But where does the code for (1) live on the python side?
As far as I can tell the only two options for this transform are either overriding the expansion service with the expansion_service=
parameter, or the default - running shadowJar with BeamJarExpansionService
(what you have in (2)). I can't find any logic that looks for an expansion service on port 8097. Am I missing something?
fbfdf22
to
bb9e342
Compare
@TheNeuralBit I've made the requested changes - I've made only optional params to be kwargs - if you think that all params should be kwargs then I'll change it. |
That sounds great. I'm fine with the required params being positional args, given what I discussed in #12023 (comment). Sorry for the misunderstanding
Yeah it was helpful to have the separation of concerns when reviewing, but I agree I think it will be better to have a single PR that we can test together. |
@TheNeuralBit I've created a separate PR with write and read #12145 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this.
transforms. | ||
|
||
If you start Flink's Job Server, the expansion service will be started on | ||
port 8097. This is also the configured default for this transform. For a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We updated Kafka to build or download the IO expansion jar by default. Probably you can follow the same model here. I updated documentation for Kafka recently: https://github.com/apache/beam/pull/11928/files
BTW one downside with a single IO expansion jar is that the size of the jar increases linearly with the number of cross-language wrappers that we have. This will increase the pipeline setup time for users since a larger jar has to be staged. What is the size increase of IO expansion jar due to this wrapper ?
cc: @robertwb
from apache_beam.transforms.external import ExternalTransform | ||
from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder | ||
|
||
__all__ = ['WriteToJdbc'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious. Were you able to successfully test this with portable Flink/Spark and Dataflow ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, with FlinkRunner, SparkRunner and DirectRunner. On Dataflow with sdk_location and dataflow_worker_jar set I wasn't successful, got INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2020/07/01 18:25:55 Failed to obtain provisioning information: failed to get manifest\n\tcaused by:\nrpc error: code = Unavailable desc = transport is closing\n'
repeated multiple times
""" | ||
Initializes a write operation to Jdbc. | ||
|
||
:param driver_class_name: name of the jdbc driver class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these parameters be specified as strings ? Probably good if we can clarify that here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added those changes in the PR #12145
Add python wrapper for jdbc write transform
Note: This is a very limited transform - it can't insert timestamp, boolean, bytes as the actual row_coder is the limitation here.
This PR is dependent on #12022
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.