Skip to content

Conversation

@dianfu
Copy link
Contributor

@dianfu dianfu commented May 17, 2019

What is the purpose of the change

This pull request adds the API from_elements in TableEnvironment. It's a convenient API to create a table from a collection of elements.
It works as follows:

  1. Serializes the python data to a local file via PickleSerializer
  2. Reads the local file and deserializes the python object and creates a DataStream from the deserialized elements in Java
  3. Creates a Table from the created DataStream

Brief change log

  • Adds from_elements API in TableEnvironment
  • Adds PythonUtil to create a DataStream/DataSet from a file containing the serialized python objects
  • Adds PythonTableUtil to create a Table from a DataStream
  • Adds PythonTypeUtil to convert the deserialized objects according to the schema

Verifying this change

This change added tests and can be verified as follows:

  • Added unit test test_calc.test_from_elements

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented May 17, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ✅ 1. The [description] looks good.
  • ✅ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ✅ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@sunjincheng121
Copy link
Member

sunjincheng121 commented May 27, 2019

The CI throws TypeError for a python test case. I have been restarted it. And make sure all the test cases work well before the review.
@flinkbot approve-until architecture

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @dianfu!
The PR overall looks good! And left a few comments. Please let me know what do you think?

One open question is:
Should we put the python code into flink-table-api-java-bridge module, and those days I think about can we add java code in flink-python?

Best,
Jincheng

def __init__(self):
# On Python 2.6, we can't write bytearrays to streams, so we need to convert them
# to strings first. Check if the version number is that old.
self._only_write_strings = sys.version_info[0:2] <= (2, 6)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not need this check, due to we are supported the 2.x from 2.7, What do you think?

class FramedSerializer(Serializer):
"""
Serializer that writes objects as a stream of (length, data) pairs,
where C{length} is a 32-bit integer and data is C{length} bytes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the C{length} meaning? the prefix C.

"""
Serializes objects using Python's pickle serializer:
http://docs.python.org/2/library/pickle.html
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to follow python 3, so how about refer https://docs.python.org/3/library/pickle.html.?

return map(lambda x: [x], self.load_stream(stream))


class FramedSerializer(Serializer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FramedSerializer is fine to me, I guess this name should refer to the network terminology, the concept of data frames. And spark also using this name. But here, I am glad to leave my suggestion: named VarLengthDataSerializer, which is a very specific serializer, i.e., (length, data) pairs, for our case. What do you think?

for obj in iterator:
self._write_with_length(obj, stream)

def load_stream(self, stream):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I think stream and batch is a bit confusing us in flink. So how about using dump_to_stream and load_from_stream?


<!-- python dependencies -->

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add the NOTICE file for this dependency.
we can take a look at: https://cwiki.apache.org/confluence/display/FLINK/Licensing

* the objects are ints, etc. Returns an ArrayList if it needs to
* contain arbitrary objects (such as lists).
*
* <p>Unpickle array.array generated by Python 2.6.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the python2.7 and 3.x also need these utils?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you point. +1 to remove the code handling Python <= 2.6.

import org.apache.flink.table.typeutils.PythonTypeUtil
import org.apache.flink.types.Row

object PythonTableUtil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PythonTableUtil -> PythonTableUtils?
same as TypeCheckUtils.

* Utility class that contains helper methods to create a DataStream/DataSet from
* a file which contains Python objects.
*/
public final class PythonUtil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PythonUtil -> PythonUtils?

import org.apache.flink.table.api.Types
import org.apache.flink.types.Row

object PythonTypeUtil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we place this until in org.apache.flink.table.python?
BTW: due to this class only contains one public method convertTo and can not be used by JAVA users, so I suggest move the convertTo into PythonTableUtil and as a private method., What do you think?

@aljoscha
Copy link
Contributor

aljoscha commented May 27, 2019

I agree with @sunjincheng121, we must not put Python-related code into unrelated packages.

@sunjincheng121
Copy link
Member

Thanks for the reply @aljoscha!

As we(@aljoscha @dianfu and me) discussed offline, we got a consistent solution, we will put Python related Java code into
flink-python module, and we will add classifier named java-binding for the JAR. So, I appreciate if you can update the PR according to our decision. @dianfu :)
Best, Jincheng

@dianfu
Copy link
Contributor Author

dianfu commented May 29, 2019

Sounds good. I will rebase the PR.

@dianfu
Copy link
Contributor Author

dianfu commented May 29, 2019

@sunjincheng121 Have addressed your comments and also moved all the Python related Java files to module flink-python.

@dianfu dianfu force-pushed the FLINK-12409 branch 2 times, most recently from 1f294dd to 40015b6 Compare May 29, 2019 12:03
Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated @dianfu
I only left a few suggestions. and one thing am not pretty sure is about the LICENSE.py4j, I think we should put into both licenses and licenses-binary folder. And think @zentol adn @aljoscha can give us more authoritative advice.

Best,
Jincheng

FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should also put LICENSE.py4j into licenses folder? due to we have put the py4j-xxx-src.zip into the flink source repo. What do you think?

Copy link
Contributor Author

@dianfu dianfu May 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I think we should as currently py4j-xxx-src.zip is bundled in the source distribution.

docs/ops/cli.md Outdated
- Run Python Table program:

./bin/flink run -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
./bin/flink run -py examples/python/table/batch/word_count.py -C <path/to/flink-table.jar> -C <path/to/flink-python-*-java-binding.jar>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is one concern: In this way, we should ask the user to keep the path consistent with the cluster which is a bit inconvenient. So, how about we deal with the flink-python-*-java-binding.jar by flink framework. and keep using -j for flink-table.jar? because in future python user may a Python Table user or a Python dataStream user. flink-table.jar not always needed. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me as flink-python-*-java-binding.jar is always needed for Python jobs.

:param elements: The elements to create table from.
:param schema: The schema of the table.
:param verify_schema: Whether to verify the elements against the schema.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, coo...ol, if some data is gen by the program, and already sure do not need check. in this case, we can set verify_schema=False.
+1

@sunjincheng121
Copy link
Member

@flinkbot attention @zentol

@dianfu
Copy link
Contributor Author

dianfu commented May 30, 2019

@sunjincheng121 Thanks a lot for your review. Have updated the PR accordingly.

@sunjincheng121
Copy link
Member

Thanks for the quick updated! @dianfu
LGTM.
From my side, +1 to merged.

@sunjincheng121
Copy link
Member

Thanks for the rebase, Merging...

sunjincheng121 pushed a commit to sunjincheng121/flink that referenced this pull request Jun 3, 2019
Note: Currently only flink planner is supported. The blink planner will be supported after the planner discovery is supported which is part of the work of FLIP32.

This closes apache#8474
@asfgit asfgit closed this in f27c40d Jun 3, 2019
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

flink-python
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have a flink-python module in flink-dist; so we now have ambiguous licensing declarations. @sunjincheng121

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for point out this @zentol!
In our ( @aljoscha and me ) plan, we want to remove the flink-libraries/flink-python later, currently, we add the classifier “java-binding” for the JAR. for the licensing I agree with you, we really have ambiguous licensing declarations here. :(
We carefully thought about whether to change the module name, but the name flink-python is too apt, other names are not suitable, and the long-term goal we will remove the flink-libraries/flink-python. So we still want to use the name flink-python. So in order to eliminate ambiguity, can we add a little explanation in licensing? Or do you have better suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well you could just call it flink-api-python or something, that's doesn't seem that inappropriate.

I don't believe we should be re-using artifactIds between releases if they are fundamentally different artifacts; it is for the same reason that we have not added another flink-ml module but named it flink-ml-parent instead.

Copy link
Member

@sunjincheng121 sunjincheng121 Jun 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the names of modules in flink is flink-language patterns, such as flink-java, flink-scala. for the goal of flink-python will add user-defined-functions support in the next release, for UDFs we will add data-service, state-service...etc. And it's not only the API level. These functions need to be integrated with the beam. The code inside flink will not be very much, so the current plan is to distinguish these codes with packages or folder(for python). So, If we should not re-using artifactId, how about using flink-py. @zentol @aljoscha

BTW: I appreciate if we can discuss this module name issue in the [DISCUSS] FLIP-38 Support python language in flink Table API: https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8/edit?usp=sharing

@rmetzger rmetzger requested a review from zentol June 5, 2019 11:37
@dianfu dianfu deleted the FLINK-12409 branch June 10, 2020 03:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants