Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12588][python] Add TableSchema for Python Table API. #8561

Closed
wants to merge 8 commits into from

Conversation

WeiZhong94
Copy link
Contributor

What is the purpose of the change

This pull request is intended to add TableSchema for Python Table API. For this goal, a _to_python_type function is introduced in this pull request. This function is for converting Java's DataType and TypeInformation objects into Python's DataType objects. For ensuring that _to_python_type and the existing _to_java_type are mutually inverse functions, this PR makes some changes on the flink python type system.

Brief change log

  • Add the TableSchema class.
  • Add get_schema method in Table class.
  • Add schema method in OldCsv and Schema class.
  • Add _to_python_type function.
  • Changed many default value and default behavior of current data types.

Verifying this change

Added integration tests in test_table_schema.py, test_schema_operation.py and test_types.py.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (python docs)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeiZhong94 Thanks a lot for the PR. Have left a few comments.


def get_field_types(self):
"""
Returns all field data types as an array.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is deprecated in Java and there is no need to add this method in Python any more.

else:
return None

def get_field_type(self, field):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deprecated in Java and we can remove it in Python

A table schema that represents a table's structure with field names and data types.
"""

def __init__(self, field_names=None, data_types=None, java_object=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java_object -> j_table_schema


def get_field_data_types(self):
"""
Returns all field data types as an array.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a list

Returns the specified data type for the given field index or field name.

:param field: The index of the field or the name of the field.
:return: The specified data type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data type of the specified field

logical_type = java_data_type.getLogicalType()
conversion_clz = java_data_type.getConversionClass()
if is_instance_of(logical_type, gateway.jvm.CharType):
python_type = DataTypes.CHAR(logical_type.getLength(), logical_type.isNullable())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python_type -> data_type

"currently." % java_data_type_input)
elif is_instance_of(logical_type, gateway.jvm.DayTimeIntervalType):
raise \
TypeError("Not supported type: %s, DayTimeIntervalType is not supported currently."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently -> yet

python_type = DataTypes.TIME(logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.ZonedTimestampType):
raise \
TypeError("Not supported type: %s, ZonedTimestampType is not supported currently."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeError("ZonedTimestampType is not supported yet").

python_type = DataTypes.MULTISET(_to_python_type(element_type),
logical_type.isNullable())
else:
raise TypeError("Not supported colletion data type: %s" % java_data_type_input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

colletion -> collection


# Unrecognized type.
else:
TypeError("Unsupported data type: %s" % java_data_type_input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about changing all the error message to "Unsupported data type: %s"?

@WeiZhong94
Copy link
Contributor Author

@dianfu Thanks for your review! I have updated the PR according to your comments.

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! @WeiZhong94!
I only left 3 suggestions about Python Doc. And one reminder: before opening the PR, we should run the flink-python/dev/lint-python.sh to run the test case and check the code format.
Best,
Jincheng

@@ -532,6 +533,9 @@ def insert_into(self, table_path, *table_path_continued):
j_table_path = to_jarray(gateway.jvm.String, table_path_continued)
self._j_table.insertInto(table_path, j_table_path)

def get_schema(self):
return TableSchema(java_object=self._j_table.getSchema())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Python Doc with Returns the schema of this table.?

@@ -177,6 +177,10 @@ def __init__(self):
self._j_schema = gateway.jvm.Schema()
super(Schema, self).__init__(self._j_schema)

def schema(self, table_schema):
self._j_schema = self._j_schema.schema(table_schema._j_table_schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the Python Doc Align with JAVA? such as:

	Sets the schema with field names and the types. Required.
	This method overwrites existing fields added with ...

@@ -285,6 +289,10 @@ def line_delimiter(self, delimiter):
self._j_csv = self._j_csv.lineDelimiter(delimiter)
return self

def schema(self, schema):
self._j_csv = self._j_csv.schema(schema._j_table_schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that the JAVA have the follows JAVA DOC:

 /**
    * Sets the format schema with field names and the types. Required.
    * The table schema must not contain nested fields.
    *
    * This method overwrites existing fields added with [[field()]].
    *
    * @param schema the table schema
    */

It's better to align the DOC, What to you think?

data_type = DataTypes.VARBINARY(logical_type.getLength(), logical_type.isNullable())
elif _is_instance_of(logical_type, gateway.jvm.DecimalType):
data_type = DataTypes.DECIMAL(logical_type.getPrecision(),
logical_type.getScale(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continuation line over-indented for visual indent, please correct the format.

if kind is None:
raise Exception("Unsupported java timestamp kind %s" % j_kind)
data_type = DataTypes.TIMESTAMP(kind,
logical_type.getPrecision(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

data_type = DataTypes.ARRAY(_from_java_type(element_type), logical_type.isNullable())
elif _is_instance_of(logical_type, gateway.jvm.MultisetType):
data_type = DataTypes.MULTISET(_from_java_type(element_type),
logical_type.isNullable())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same above

@@ -573,7 +573,7 @@ under the License.
<pattern>py4j</pattern>
<shadedPattern>org.apache.flink.api.python.py4j</shadedPattern>
<includes>
<include>py4j.*</include>
<include>py4j.*.*</include>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, without this change py4j is not shaded completely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be net.sf.py4j:* is correct, and FLINK-12409 will correct this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the solution in FLINK-12409 makes more sense. I have removed this change in the new commit which will cause the CI test failure for the moment. I will rebase this PR after FLINK-12409 merged and this problem would be solved.

@sunjincheng121
Copy link
Member

#8474 has merged, please rebase the PR! thanks! :)

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeiZhong94 Thanks a lot for the update. I have left a few comments.

This method overwrites existing fields added with
:func:`~pyflink.table.table_descriptor.Schema.field`.

:param schema: The :class:`TableSchema` object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema -> table_schema

@@ -287,6 +300,19 @@ def line_delimiter(self, delimiter):
self._j_csv = self._j_csv.lineDelimiter(delimiter)
return self

def schema(self, schema):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about changing the argument name to table_schema to be consistent with the method Schema.schema?

@@ -113,7 +113,7 @@ def test_from_element(self):
DataTypes.STRING(), DataTypes.DATE(),
DataTypes.TIME(),
DataTypes.TIMESTAMP(),
DataTypes.ARRAY(DataTypes.DOUBLE()),
DataTypes.ARRAY(DataTypes.DOUBLE().not_null()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test case for input element [1.0, None]?

class TableSchemaTests(PyFlinkTestCase):

def test_init(self):
schema = \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to add a new line here.

@@ -389,9 +405,10 @@ def __init__(self, precision=0, nullable=True):
super(TimeType, self).__init__(nullable)
assert 0 <= precision <= 9
self.precision = precision
self.bridged_to("java.time.LocalTime")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about revert this kind of changes?


@classmethod
def TIMESTAMP(cls, kind=TimestampKind.REGULAR, precision=6, nullable=True):
return TimestampType(kind, precision, nullable)
def TIMESTAMP(cls, kind=TimestampKind.REGULAR, precision=3, nullable=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this change?

@WeiZhong94
Copy link
Contributor Author

@dianfu Thanks for your review! I have addressed your comment.

@sunjincheng121
Copy link
Member

+1 to merged.

sunjincheng121 pushed a commit to sunjincheng121/flink that referenced this pull request Jun 9, 2019
@asfgit asfgit closed this in 8eaa2d0 Jun 9, 2019
sjwiesman pushed a commit to sjwiesman/flink that referenced this pull request Jun 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants