Skip to content

Conversation

@Juta
Copy link
Contributor

@Juta Juta commented Mar 18, 2019

This is is part of a series of PRs with goal to make Apache Beam PY3 compatible. The proposal with the outlined approach has been documented here: https://s.apache.org/beam-python-3.

This PR adds a more integration tests to the postcommit jobs on direct and dataflow runners.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Python Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@Juta
Copy link
Contributor Author

Juta commented Mar 18, 2019

Run Python PostCommit

@Juta
Copy link
Contributor Author

Juta commented Mar 19, 2019

Run Python PostCommit

@Juta Juta force-pushed the bq-io-tests branch 5 times, most recently from 49cbdeb to a12b64b Compare March 19, 2019 15:55
@Juta
Copy link
Contributor Author

Juta commented Mar 19, 2019

Run Python PostCommit

@Juta Juta force-pushed the bq-io-tests branch 2 times, most recently from fc57d0e to 98d7f0f Compare March 20, 2019 09:45
@Juta
Copy link
Contributor Author

Juta commented Mar 20, 2019

Run Python PostCommit

@Juta Juta force-pushed the bq-io-tests branch 2 times, most recently from 9a05dd2 to 9aa9f37 Compare March 20, 2019 14:12
@Juta
Copy link
Contributor Author

Juta commented Mar 20, 2019

Run Python PostCommit

@Juta
Copy link
Contributor Author

Juta commented Mar 20, 2019

cc: @pabloem @ttanay @tvalentyn

Copy link
Contributor

@tvalentyn tvalentyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, @Juta for the changes and the clean up. Left some comments, please take a look.


def process(self, elem):
try:
if isinstance(elem, bytes):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we always decode here? It is better to have a clear expectation of input arguments as much as possible on Python 3: either always encoded bytes, or always strings, but not mixing the two.


def process(self, elem):
try:
if isinstance(elem, bytes):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we always decode here? It is better to have a clear expectation of input arguments as much as possible on Python 3: either always encoded bytes, or always strings, but not mixing the two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IT test uses pubsub as input source while the unit test currently passes strings. That's why we cannot always decode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow - could you please elaborate - what exactly in either pubsub or unit test makes it so that the input type is not consistent here?

Copy link
Contributor Author

@Juta Juta Mar 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the fix would then be to:

  1. Expect elem to be a string.
  2. Decode the messages as they arrive from PubSub using | 'DecodeString' >> Map(lambda b: b.decode('utf-8') in the example pipeline. Note that there are several example pipelines that will need this change.

Note that we used to have ReadStringsFromPubSub method that was deprecated:

@deprecated(since='2.7.0', extra_message='Use ReadFromPubSub instead.')
.

We may want to revisit this deprecation in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tvalentyn I applied this fix


# Input event containing user, team, score, processing time, window start.
INPUT_EVENT = 'user1,teamA,10,%d,2015-11-02 09:09:28.224'
INPUT_EVENT = b'user1,teamA,10,%d,2015-11-02 09:09:28.224'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this should be bytes. Does self.pub_client.publish require bytes? If so, we should encode before passing data to that method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes self.pub_client.publish requires bytes. In this case I think specifying the input event as bytes is what is expected because it is directly passed to the client. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the question is then, to the humans who read and edit this code, should INPUT_EVENT be a textual date or encoded data? I think it's easier to consider it text, up until it's time to feed it to pubsub, then when we can encode it to bytes.

I'd keep INPUT_EVENT as is change publishing to:

event = self.INPUT_EVENT % self._test_timestamp
self.pub_client.publish(event.encode('utf-8'))

I would expect users to follow a similar pattern in their pipelines, and they might refer to beam examples for guidance, so I suggest to change this.


# Input events containing user, team, score, processing time, window start.
INPUT_EVENT = 'user1,teamA,10,%d,2015-11-02 09:09:28.224'
INPUT_EVENT = b'user1,teamA,10,%d,2015-11-02 09:09:28.224'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this should be bytes. Does self.pub_client.publish require bytes? If so, we should encode before passing data to that method,

To quote https://docs.python.org/3/howto/pyporting.html , Decode binary data to text as soon as possible, encode text as binary data as late as possible

@Juta
Copy link
Contributor Author

Juta commented Mar 22, 2019

Run Python PostCommit

@Juta
Copy link
Contributor Author

Juta commented Apr 2, 2019

Run Python PostCommit

@Juta Juta force-pushed the bq-io-tests branch 2 times, most recently from c11d669 to c548b69 Compare April 2, 2019 10:01
@Juta
Copy link
Contributor Author

Juta commented Apr 2, 2019

Run Python PostCommit

@Juta
Copy link
Contributor Author

Juta commented Apr 2, 2019

Run Python PostCommit

@Juta Juta changed the title [BEAM-6619] [BEAM-6593] Add bigquery integration tests to postcommit [BEAM-6619] [BEAM-6593] Add example integration tests to postcommit Apr 2, 2019
Copy link
Contributor

@tvalentyn tvalentyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @Juta!

@aaltay
Copy link
Member

aaltay commented Apr 2, 2019

Thank you @Juta and @tvalentyn

@aaltay aaltay merged commit b6800ce into apache:master Apr 2, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants