-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-18763][python] Support basic TypeInformation for Python DataSt… #13029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 2cb0d81 (Thu Jul 30 12:09:44 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
hequn8128
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shuiqiangchen Thanks a lot for your PR. Left some comments below.
| TypeInformation is the core class of Flink's type system. FLink requires a type information | ||
| for all types that are used as input or return type of a user function. This type information | ||
| class acts as the tool to generate serializers and comparators, and to perform semantic checks | ||
| such as whether the fields that are used as join/grouping keys actually existt. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
existt => exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I will revise it.
| in the arrays. | ||
| a) Basic types are indivisible and are considered as a single field. | ||
| b) Arrays and collections are one field. | ||
| c) Tuples and case classes represent as many fields as the class has fields. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have case classes in PyFlink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will remove it.
| """ | ||
|
|
||
| @abstractmethod | ||
| def is_basic_type(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add type hint: def is_basic_type(self) -> bool:. Same for other methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it's good to add type hints.
| def is_tuple_type(self): | ||
| """ | ||
| Checks if this type information represents a Tuple type. | ||
| Tuple types are subclasses of the Java API tuples. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this line. Tuple type is supported in Python originally. We should not simply copy Java comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, there should be differences between java and python comments.
| nested fields, in the case of composite types. | ||
| The total number of fields must be at lest 1. | ||
|
|
||
| :return: The number of fields in this type, including its sub-fields (for composit types). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
composite types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I'll revise it.
| .PickledByteArrayTypeInfo()) | ||
|
|
||
|
|
||
| class BigDecimalTypeInfo(WrapperTypeInfo): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decimal should exist in BasicTypeInfo together with BIG_INT_TYPE_INFO. The decimal type info in DataStream is different from table. The precision and scale are not fixed, i.e., not be specified during initialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll remove it.
| * A PickledByteArrayTypeInfo indicates that the data of this type is a generated primitive byte | ||
| * array by pickle. | ||
| */ | ||
| public class PickledByteArrayTypeInfo extends TypeInformation<byte[]> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it a Singleton, similar to BasicTypeInfo in which provides a static final instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, making it singleton is better.
|
Hi @hequn8128 , thank you for your comments, I have updated the pr according to your review suggestions, please have a look at it. |
hequn8128
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More comments below
| field_names = [name for name in j_field_names] | ||
| return field_names | ||
|
|
||
| def get_field_index(self, field_name) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add type hint.
| raise TypeError("Invalid element type for a primitive array.") | ||
|
|
||
|
|
||
| def from_java_type(j_type_info: JavaObject) -> TypeInformation: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Private method. Rename to _from_java_type.
| raise TypeError("The java type info: %s is not supported in PyFlink currently." % j_type_info) | ||
|
|
||
|
|
||
| def is_instance_of(java_object: JavaObject, java_type: Union[JavaObject, JavaClass]) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Private method. Rename to _is_instance_of.
| PICKLED_BYTE_ARRAY = PickledBytesTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO | ||
|
|
||
| @staticmethod | ||
| def ROW(types): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add type hint
| return RowTypeInfo(types) | ||
|
|
||
| @staticmethod | ||
| def ROW_NAMED(names, types): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add type hint
| from pyflink.common.typeinfo import Types, RowTypeInfo | ||
|
|
||
|
|
||
| class TypeInfoTests(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add tests to cover TupleTypeInfo and methods in Types(Types.ROW(), Types.ROW_NAMED, etc).
...thon/src/main/java/org/apache/flink/datastream/typeinfo/python/PickledByteArrayTypeInfo.java
Outdated
Show resolved
Hide resolved
...thon/src/main/java/org/apache/flink/datastream/typeinfo/python/PickledByteArrayTypeInfo.java
Show resolved
Hide resolved
hequn8128
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shuiqiangchen Thanks a lot for the update. The code looks good overall. Some minor comments below.
| TupleTypeInfo([Types.STRING(), Types.INT()]), True) | ||
|
|
||
| self.assertEqual(TupleTypeInfo([Types.STRING(), Types.INT()]).__str__(), | ||
| "Java Tuple2<String, Integer>") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is strange to return a string with Java for Python typeinfo. We need to regenerate the string for TupleTypeinfo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, maybe we should not directly call to toString() of the java type info object.
|
|
||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the useless empty lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll clean it.
| ['a', 'b']), False) | ||
| self.assertEqual(RowTypeInfo([Types.STRING(), | ||
| Types.STRING()], | ||
| ['a', 'b']).__str__(), "RowTypeInfo[String, String]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also print field name for row type info.
hequn8128
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good from my side. Will merge this if @twalthr does not have any other concerns.
Best,
Hequn
hequn8128
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shuiqiangchen My last comments. Maybe we can remove the methods in the python TypeInformation since we don't need them now. It is always easier to add APIs than delete them. We can add these methods later when we find it is useful.
| def is_basic_type(self) -> bool: | ||
| """ | ||
| Checks if this type information represents a basic type. | ||
| Basic types are defined in BasicTypeInfo and are primitives, their boxing type, Strings ... | ||
|
|
||
| :return: True, if this type information describes a basic type, false otherwise. | ||
| """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def is_tuple_type(self) -> bool: | ||
| """ | ||
| Checks if this type information represents a Tuple type. | ||
|
|
||
| :return: True, if this type information describes a tuple type, false otherwise. | ||
| """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def get_arity(self) -> int: | ||
| """ | ||
| Gets the arity of this type - the number of fields without nesting. | ||
|
|
||
| :return: the number of fields in this type without nesting. | ||
| """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def get_total_fields(self) -> int: | ||
| """ | ||
| Gets the number of logical fields in this type. This includes its nested and transitively | ||
| nested fields, in the case of composite types. | ||
| The total number of fields must be at lest 1. | ||
|
|
||
| :return: The number of fields in this type, including its sub-fields (for composite types). | ||
| """ | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove these methods since we don't need them now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I will remove them.
| def is_basic_type(self) -> bool: | ||
| return self._j_typeinfo.isBasicType() | ||
|
|
||
| def is_tuple_type(self) -> bool: | ||
| return self._j_typeinfo.isTupleType() | ||
|
|
||
| def get_arity(self) -> int: | ||
| return self._j_typeinfo.getArity() | ||
|
|
||
| def get_total_fields(self) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
@hequn8128 Thank you for your comments, I have updated the code, please take a look at it. |
|
@shuiqiangchen Thanks a lot for the update. Please check the azure test failures. |
What is the purpose of the change
Supports basic TypeInformation including BasicTypeInfo, LocalTimeTypeInfo, PrimitiveArrayTypeInfo, RowTypeInfo.
Types.ROW()/Types.ROW_NAMED()/Types.PRIMITIVE_ARRAY() should also be supported.
Brief change log
Verifying this change
This change is already covered by existing tests, such as:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no no)Documentation