Skip to content

[FLINK-16996][table] Refactor planner and connectors to use new data structures#11925

Closed
wuchong wants to merge 9 commits intoapache:masterfrom
wuchong:rowdata-planner
Closed

[FLINK-16996][table] Refactor planner and connectors to use new data structures#11925
wuchong wants to merge 9 commits intoapache:masterfrom
wuchong:rowdata-planner

Conversation

@wuchong
Copy link
Copy Markdown
Member

@wuchong wuchong commented Apr 27, 2020

What is the purpose of the change

Refactors existing code to use the new data structures interfaces.

Brief change log

The commints in the order:

  • [table-common] Add necessary methods to internal data structures
  • [table-common] Add binary implementations of internal data structures
  • [table-runtime-blink] Implement all the data structures and serializers around RowData
  • [table-runtime-blink] Remove legacy data formats (BaseRow)
  • [table-blink] Refactor planner and runtime to use new data structures
  • [python] Refactor pyflink to use new data structures
  • [parquet] Refactor parquet connector to use new data structures
  • [orc] Refactor ORC connector to use new data structures
  • [hive] Refactor Hive connector to use new data structures

Some notable changes:

  • In code generation, we hard cast StringData to BinaryStringData. This makes the code generator easily to generate opeartions based on string. The same to RawValueData.
  • Most methods of Decimal have been moved to DecimalDateUtil. So I also updated the code generation logic.
  • Remove RecordEqualiser#equalsWithoutHeader interface. This method is used less and can be replaced by RecordEqualiser#equals. This can also avoid add equalsWithoutHeader method to the public API GenericRowData.

Verifying this change

This change is covered by existing tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@wuchong wuchong requested a review from JingsongLi April 27, 2020 16:20
@flinkbot
Copy link
Copy Markdown
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit fd62e8e (Mon Apr 27 16:23:50 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 27, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@wuchong
Copy link
Copy Markdown
Member Author

wuchong commented Apr 28, 2020

Hi @dianfu , could you help to review the python part? RowData and ArrayDat don't implement TypeGetterSetter, so I have to refactor the ArrowFieldWriter implementations.

Copy link
Copy Markdown
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked runtime part and LGTM

@@ -66,4 +69,20 @@ public static boolean byteArrayEquals(
return true;
}

public static String toString(RowData row, LogicalType[] types) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used in tests, move it to tests?

str.ensureMaterialized();

if (precision > Decimal.MAX_LONG_DIGITS || str.getSizeInBytes() > Decimal.MAX_LONG_DIGITS) {
if (DecimalDataUtils.isByteArrayDecimal(precision) || DecimalDataUtils.isByteArrayDecimal(str.getSizeInBytes())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? why we check both precision and sizeInBytes with the same method?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @JingsongLi , do you know why we check both of them?

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi Apr 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is right.
The case is, precision is 10 (less than MAX_LONG_DIGITS), but the string data is more than 10 precision maybe, we can use big decimal to convert, and then remove the extra decimals according to the precision.

* Converts a {@link MapData} into Java {@link Map}, the keys and values of the Java map
* still holds objects of internal data structures.
*/
public static Map<Object, Object> convertToJavaMap(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only used by tests

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dianfu
Copy link
Copy Markdown
Contributor

dianfu commented Apr 28, 2020

@wuchong The python part LGTM.

Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback for 38946ea.

if (i != 0) {
sb.append(",");
}
sb.append(StringUtils.arrayAwareToString(fields[i]));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use org.apache.flink.table.utils.EncodingUtils#objectToString

*
* <p>By default the result type of an evaluation method is determined by Flink's type extraction
* facilities. Currently, only support {@link org.apache.flink.types.Row} and {@code BaseRow} as
* facilities. Currently, only support {@link org.apache.flink.types.Row} and {@code RowData} as
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use: {@link RowData}

case RAW:
return RawValueData.class;
default:
throw new UnsupportedOperationException("Not support type: " + type);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Unsupported type:

}

/**
* Get internal(sql engine execution data formats) conversion class for {@link LogicalType}.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Returns the conversion class for the given {@link LogicalType} that is used by the table runtime.

/**
* Get internal(sql engine execution data formats) conversion class for {@link LogicalType}.
*/
public static Class<?> internalConversionClass(LogicalType type) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: toInternalConversionClass

Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some feedback to 9517bc9.

What I don't like is that we have a lot of runtime code in flink-table-common now which means it is also available in the API. I'm wondering if we could at least hide some util classes by a default scope visibility. At least we should move all of those utilities to the binary package.

* Precision is not compact: can not call setNullAt when decimal is null, must call
* setDecimal(i, null, precision) because we need update var-length-part.
*/
void setDecimal(int i, DecimalData value, int precision);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i => pos

void setNullAt(int pos);

/**
* Set boolean value.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove those JavaDocs. They are not useful and just make the code more complicated.

* <p>Note:
* Precision is compact: can call setNullAt when decimal is null.
* Precision is not compact: can not call setNullAt when decimal is null, must call
* setDecimal(i, null, precision) because we need update var-length-part.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use {@code } to format the JavaDoc

/**
* Binary format spanning {@link MemorySegment}s.
*/
public interface BinaryFormat {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing @Internal

* <p>It can lazy the conversions as much as possible. It will be converted into required form
* only when it is needed.
*/
public abstract class LazyBinaryFormat<T> implements BinaryFormat {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing @Internal

* Utilities for binary data segments which heavily uses {@link MemorySegment}.
*/
@Internal
public class BinarySegmentUtils {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this under org.apache.flink.table.data.binary. Make the class final with a private default constructor.

* Murmur Hash. This is inspired by Guava's Murmur3_32HashFunction.
*/
@Internal
public final class MurmurHashUtils {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put under org.apache.flink.table.data.binary add private default constructor

* Utilities for String UTF-8.
*/
@Internal
public class StringUtf8Utils {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put under org.apache.flink.table.data.binary make final with private default constructor

* used on the binary format such as {@link BinaryRowData}.
*/
@Internal
public interface TypedSetters {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is mainly used for binary formats, put it in the binary package.

@twalthr
Copy link
Copy Markdown
Contributor

twalthr commented Apr 28, 2020

Thanks @wuchong for this massive PR. I took a look at the classes in table-common. Apart from my comments, they look good to me.

@wuchong
Copy link
Copy Markdown
Member Author

wuchong commented Apr 28, 2020

Thanks @KurtYoung @dianfu @twalthr for the quickly reviewing. I have addressed all the comments.

Hi Timo, currently, it's hard to make all classes under binary. to be package-visible, because they will be used in serializers and code generation. But I think MurmurHashUtils and StringUtf8Utils can be, so I updated them.

@Internal
public final class NestedRowData extends BinarySection implements RowData, TypedSetters {

private static final long serialVersionUID = 1L;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it.


import java.io.Serializable;

/**
* Record equaliser for BaseRow which can compare two BaseRows and returns whether they are equal.
* Record equaliser for RowData which can compare two RowDatas and returns whether they are equal.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowData data no plurality "s"

* Returns a term for representing the given class in Java code.
*/
def typeTerm(clazz: Class[_]): String = {
if (clazz == classOf[StringData]) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this put CodeGen at risk of missing cast?
If possible, I prefer cast in accessing its methods.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider RowData and others, using StringData more reasonable to me.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many calls on the BinaryStringData in code generation now, if we use StringData in code generation, we have to refactor a lot of codes. And we don't benefit much from this effort. I created FLINK-17437 to track this, we can refactor this in the future.

case None =>
val term = newName("typeSerializer")
val ser = InternalSerializers.create(t, new ExecutionConfig)
val ser = InternalSerializers.createInternalSerializer(t, new ExecutionConfig)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createInternalSerializer a little redundant

@JingsongLi
Copy link
Copy Markdown
Contributor

There are still 100+ BaseRow in codes (variable name or something else), you can modify them all.

@wuchong
Copy link
Copy Markdown
Member Author

wuchong commented Apr 28, 2020

Thanks for the reviewing @JingsongLi . I have addressed the comments and renamed lagecy field names and method which use BaseRow, SqlTimestamp and BinaryGeneric.

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wuchong , looks good to me from my side.

@wuchong
Copy link
Copy Markdown
Member Author

wuchong commented Apr 29, 2020

Thanks all for the reviewing. I have rebased the commits. Will merge this once builds passed.

@JingsongLi
Copy link
Copy Markdown
Contributor

Hi @wuchong , do you want to squash or not? Here is my question:
Does it need to be able to compile for each commit to the master?

@wuchong
Copy link
Copy Markdown
Member Author

wuchong commented Apr 29, 2020

Hi @JingsongLi , I want to keep the splitted commits. From my point of view, squashing such a large PR into one commit is not good.

@JingsongLi
Copy link
Copy Markdown
Contributor

Hi @JingsongLi , I want to keep the splitted commits. From my point of view, squashing such a large PR into one commit is not good.

So the answer is we can push commit with broken compilation.

@wuchong
Copy link
Copy Markdown
Member Author

wuchong commented Apr 29, 2020

@JingsongLi , as far as I know, the community doesn't have a rule to make sure independent commits can pass build, but there is a rule to separate commits if it is a big commit:

https://flink.apache.org/contributing/code-style-and-quality-pull-requests.html#separate-refactoring-cleanup-and-independent-changes

@wuchong
Copy link
Copy Markdown
Member Author

wuchong commented Apr 29, 2020

@JingsongLi
Copy link
Copy Markdown
Contributor

@wuchong go you.

@wuchong wuchong closed this in 2296487 Apr 29, 2020
wuchong added a commit that referenced this pull request Apr 29, 2020
wuchong added a commit that referenced this pull request Apr 29, 2020
wuchong added a commit that referenced this pull request Apr 29, 2020
wuchong added a commit that referenced this pull request Apr 29, 2020
wuchong added a commit that referenced this pull request Apr 29, 2020
@wuchong wuchong deleted the rowdata-planner branch April 29, 2020 06:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants